Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43082][CONNECT][PYTHON] Arrow-optimized Python UDFs in Spark Connect #40725

Closed
wants to merge 13 commits into from

Conversation

xinrong-meng
Copy link
Member

@xinrong-meng xinrong-meng commented Apr 10, 2023

What changes were proposed in this pull request?

Implement Arrow-optimized Python UDFs in Spark Connect.

Please see #39384 for motivation and performance improvements of Arrow-optimized Python UDFs.

Why are the changes needed?

Parity with vanilla PySpark.

Does this PR introduce any user-facing change?

Yes. In Spark Connect Python Client, users can:

  1. Set useArrow parameter True to enable Arrow optimization for a specific Python UDF.
>>> df = spark.range(2)
>>> df.select(udf(lambda x : x + 1, useArrow=True)('id')).show()
+------------+                                                                  
|<lambda>(id)|
+------------+
|           1|
|           2|
+------------+

# ArrowEvalPython indicates Arrow optimization
>>> df.select(udf(lambda x : x + 1, useArrow=True)('id')).explain()
== Physical Plan ==
*(2) Project [pythonUDF0#18 AS <lambda>(id)#16]
+- ArrowEvalPython [<lambda>(id#14L)#15], [pythonUDF0#18], 200
   +- *(1) Range (0, 2, step=1, splits=1)
  1. Enable spark.sql.execution.pythonUDF.arrow.enabled Spark Conf to make all Python UDFs Arrow-optimized.
>>> spark.conf.set("spark.sql.execution.pythonUDF.arrow.enabled", True)
>>> df.select(udf(lambda x : x + 1)('id')).show()
+------------+                                                                  
|<lambda>(id)|
+------------+
|           1|
|           2|
+------------+

# ArrowEvalPython indicates Arrow optimization
>>> df.select(udf(lambda x : x + 1)('id')).explain()
== Physical Plan ==
*(2) Project [pythonUDF0#30 AS <lambda>(id)#28]
+- ArrowEvalPython [<lambda>(id#26L)#27], [pythonUDF0#30], 200
   +- *(1) Range (0, 2, step=1, splits=1)

How was this patch tested?

Parity unit tests.

SPARK-40307

else:
return regular_udf


def _create_arrow_py_udf(f, regular_udf): # type: ignore
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignoring the type annotations of _create_arrow_py_udf because it is shared between vanilla PySpark and Spark Connect Python Client.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function is only an extraction of original code L142 - L179 for code reuse.

else useArrow
)

regular_udf = _create_udf(f, returnType, evalType)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is duplicated code in _create_py_udf between Spark Connect Python Client and vanilla PySpark, except for fetching the active SparkSession.
However, for a clear code path separation and abstraction, I decided not to refactor it for now.

@xinrong-meng
Copy link
Member Author

CI failed because of

Run echo "APACHE_SPARK_REF=$(git rev-parse HEAD)" >> $GITHUB_ENV
fatal: detected dubious ownership in repository at '/__w/spark/spark'
To add an exception for this directory, call:

	git config --global --add safe.directory /__w/spark/spark
fatal: detected dubious ownership in repository at '/__w/spark/spark'
To add an exception for this directory, call:

	git config --global --add safe.directory /__w/spark/spark
Error: Process completed with exit code 128.

@xinrong-meng xinrong-meng force-pushed the connect_arrow_py_udf branch from 95cad25 to f6fc6e1 Compare April 17, 2023 20:56
@xinrong-meng
Copy link
Member Author

@HyukjinKwon @zhengruifeng Would you please take a look? Thank you!

@HyukjinKwon
Copy link
Member

cc @ueshin FYI

import pandas as pd
from pyspark.sql.pandas.functions import _create_pandas_udf

return_type = regular_udf.returnType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems that the regular_udf is only used to pass the returnType and evalType ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And regular_udf.func based on the updated code.

@zhengruifeng zhengruifeng changed the title [SPARK-43082][Connect][PYTHON] Arrow-optimized Python UDFs in Spark Connect [SPARK-43082][CONNECT][PYTHON] Arrow-optimized Python UDFs in Spark Connect Apr 20, 2023
@HyukjinKwon
Copy link
Member

Merged to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants