-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-40852][CONNECT][PYTHON] Introduce StatFunction
in proto and implement DataFrame.summary
#38318
Conversation
329a2b8
to
9897cdb
Compare
DataFrame.summary
DataFrameFunction
in proto and implement DataFrame.summary
DataFrameFunction
in proto and implement DataFrame.summary
DataFrameFunction
in proto and implement DataFrame.summary
rel.getFunctionCase match { | ||
case proto.DataFrameFunction.FunctionCase.SUMMARY => | ||
StatFunctions | ||
.summary(Dataset.ofRows(session, child), rel.getSummary.getStatisticsList.asScala.toSeq) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fine for now but it's going to truncate the SQL plans that disable further optimization. We should probably add dedicated plans for def summary
in Dataset
itself.
For now, LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, then it will have more optimization space. let us add new plan for it. thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1!
I don't know how to add a new plan. It would be very useful to have a PR as an example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @cloud-fan @HyukjinKwon
since we had reimplemented the df.summary
6a0713a, are there some differences in sql optimization between this method (directly invoke df.summary
) and adding a dedicated plan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some rules may not work as they don't recognize the new plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do you mean by truncate the SQL plans
? DataFrame transformations just accumulate the logical plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan the old df.summary
eagerly compute the statistics and always return a LocalRelation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh that's an issue. Can it be solved by updating df.summary
implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it has been resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the new impl
1787ea8
to
4727094
Compare
4727094
to
10c0968
Compare
DataFrameFunction
in proto and implement DataFrame.summary
StatFunction
in proto and implement DataFrame.summary
@@ -323,6 +323,14 @@ def unionByName(self, other: "DataFrame", allowMissingColumns: bool = False) -> | |||
def where(self, condition: Expression) -> "DataFrame": | |||
return self.filter(condition) | |||
|
|||
def summary(self, *statistics: str) -> "DataFrame": | |||
_statistics: List[str] = list(statistics) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
different from
spark/python/pyspark/sql/dataframe.py
Lines 2575 to 2576 in 29e4552
if len(statistics) == 1 and isinstance(statistics[0], list): | |
statistics = statistics[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with one comment
@@ -323,6 +323,14 @@ def unionByName(self, other: "DataFrame", allowMissingColumns: bool = False) -> | |||
def where(self, condition: Expression) -> "DataFrame": | |||
return self.filter(condition) | |||
|
|||
def summary(self, *statistics: str) -> "DataFrame": | |||
_statistics: List[str] = list(statistics) | |||
assert all(isinstance(s, str) for s in _statistics) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given def summary(self, *statistics: str) -> "DataFrame":
is public API so there could be some mis-use passing into non-str parameters for statistics
, is this common practice to just assert without giving a message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In contrast, I guess the assert in plan.py
is ok because that is internal API and we can implement it right so assert on unexpected things (and developer can fix when it really happens).
def __init__(self, child: Optional["LogicalPlan"], function: str, **kwargs: Any) -> None:
super().__init__(child)
assert function in ["summary"]
self.function = function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree, should give the error message, will update
oneof function { | ||
Summary summary = 2; | ||
|
||
Unknown unknown = 999; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here follows https://github.com/apache/spark/blob/master/connector/connect/src/main/protobuf/spark/connect/relations.proto#L51 to catch unexpected input
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's for enum but here is an optional field... cc @amaliujia
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: will we add new functions under this oneof
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, such as crosstab
cov
corr
etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok then this makes sense
import org.apache.spark.sql.connect.dsl.commands._ | ||
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} | ||
|
||
class SparkConnectStatFunctionSuite |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need a new test suite?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i encounter some problems in adding test in existing suites since summary requires a session and need to analyze the plan;
this suite will also cover some eargly computed stat functions (cov
corr
) in the future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let me take another look
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed, add another test in existing suites
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initially I thought a separate suite was ok because that suite actually just executed the proto plan to check expected result.
Now it's changed to plan comparison based test, which is ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tests were improved a lot since the time i sent this pr :)
8770d9b
to
8e7f85b
Compare
thanks, merging to master! |
thanks @cloud-fan @HyukjinKwon @amaliujia for reviews! |
…implement `DataFrame.summary` ### What changes were proposed in this pull request? Implement `DataFrame.summary` there is a set of DataFrame APIs implemented in [`StatFunctions`](https://github.com/apache/spark/blob/9cae423075145d3dd81d53f4b82d4f2af6fe7c15/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala), [`DataFrameStatFunctions`](https://github.com/apache/spark/blob/b69c26833c99337bb17922f21dd72ee3a12e0c0a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala) and [`DataFrameNaFunctions`](https://github.com/apache/spark/blob/5d74ace648422e7a9bff7774ac266372934023b9/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala), which I think can not be implemented in connect client due to: 1. depend on Catalyst's analysis (most of them); ~~2. implemented in RDD operations (like `summary`,`approxQuantile`);~~ (resolved by reimpl) ~~3. internally trigger jobs (like `summary`);~~ (resolved by reimpl) This PR introduced a new proto `StatFunction` to support `StatFunctions` method ### Why are the changes needed? for Connect API coverage ### Does this PR introduce _any_ user-facing change? yes, new API ### How was this patch tested? added UT Closes apache#38318 from zhengruifeng/connect_df_summary. Authored-by: Ruifeng Zheng <ruifengz@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Implement
DataFrame.summary
there is a set of DataFrame APIs implemented in
StatFunctions
,DataFrameStatFunctions
andDataFrameNaFunctions
, which I think can not be implemented in connect client due to:2. implemented in RDD operations (like(resolved by reimpl)summary
,approxQuantile
);3. internally trigger jobs (like(resolved by reimpl)summary
);This PR introduced a new proto
StatFunction
to supportStatFunctions
methodWhy are the changes needed?
for Connect API coverage
Does this PR introduce any user-facing change?
yes, new API
How was this patch tested?
added UT