Skip to content

Commit

Permalink
[SPARK-44746][PYTHON] Add more Python UDTF documentation for function…
Browse files Browse the repository at this point in the history
…s that accept input tables

### What changes were proposed in this pull request?

This PR adds more Python UDTF documentation for functions that accept input tables.

### Why are the changes needed?

This functionality was added recently but not covered in docs yet.

### Does this PR introduce _any_ user-facing change?

No, it's a documentation-only change.

### How was this patch tested?

N/A

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#45375 from dtenedor/update-udtf-docs.

Authored-by: Daniel Tenedorio <daniel.tenedorio@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
dtenedor authored and HyukjinKwon committed Mar 6, 2024

Verified

This commit was signed with the committer’s verified signature.
jackpot51 Jeremy Soller
1 parent 4743103 commit e0d7910
Showing 2 changed files with 302 additions and 68 deletions.
80 changes: 80 additions & 0 deletions examples/src/main/python/sql/udtf.py
Original file line number Diff line number Diff line change
@@ -210,6 +210,83 @@ def eval(self, row: Row):
# +---+


def python_udtf_table_argument_with_partitioning(spark: SparkSession) -> None:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

# Define and register a UDTF.
@udtf(returnType="a: string, b: int")
class FilterUDTF:
def __init__(self):
self.key = ""
self.max = 0

def eval(self, row: Row):
self.key = row["a"]
self.max = max(self.max, row["b"])

def terminate(self):
yield self.key, self.max

spark.udtf.register("filter_udtf", FilterUDTF)

# Create an input table with some example values.
spark.sql("DROP TABLE IF EXISTS values_table")
spark.sql("CREATE TABLE values_table (a STRING, b INT)")
spark.sql("INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)")
spark.table("values_table").show()
# +-------+----+
# | a | b |
# +-------+----+
# | "abc" | 2 |
# | "abc" | 4 |
# | "def" | 6 |
# | "def" | 8 |
# +-------+----+

# Query the UDTF with the input table as an argument, and a directive to partition the input
# rows such that all rows with each unique value of the `a` column are processed by the same
# instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
spark.sql("""
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1
""").show()
# +-------+----+
# | a | b |
# +-------+----+
# | "abc" | 4 |
# | "def" | 8 |
# +-------+----+

# Query the UDTF with the input table as an argument, and a directive to partition the input
# rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
# processed by the same instance of the UDTF class. Within each partition, the rows are ordered
# by the `b` column.
spark.sql("""
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1
""").show()
# +-------+---+
# | a | b |
# +-------+---+
# | "def" | 8 |
# +-------+---+

# Query the UDTF with the input table as an argument, and a directive to consider all the input
# rows in one single partition such that exactly once instance of the UDTF class consumes all of
# the input rows. Within each partition, the rows are ordered by the `b` column.
spark.sql("""
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1
""").show()
# +-------+----+
# | a | b |
# +-------+----+
# | "def" | 8 |
# +-------+----+

# Clean up.
spark.sql("DROP TABLE values_table")


if __name__ == "__main__":
spark = SparkSession \
.builder \
@@ -237,4 +314,7 @@ def eval(self, row: Row):
print("Running Python UDTF table argument example")
python_udtf_table_argument(spark)

print("Running Python UDTF table argument with partitioning example")
python_udtf_table_argument_with_partitioning(spark)

spark.stop()
Loading

0 comments on commit e0d7910

Please sign in to comment.