Skip to content

Commit

Permalink
[DOP-13846] - implement XML.parse_column (#269)
Browse files Browse the repository at this point in the history
* [DOP-13846] - implement XML.parse_column

* [DOP-13845] - add xml documentation

* [DOP-13846] - add bypass test and note for rootTag

* [DOP-13846] - update test for xml parse column

* [DOP-13846] - update test for xml parse column

* [DOP-13846] - update test for xml parse column

* [DOP-13846] - update parse_column note
  • Loading branch information
maxim-lixakov authored May 3, 2024
1 parent 5417dc5 commit 076d073
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 3 deletions.
1 change: 1 addition & 0 deletions docs/changelog/next_release/269.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add ``XML.parse_column`` method for handling XML data within Spark. This method allows for direct parsing of XML strings into structured Spark DataFrame columns.
53 changes: 53 additions & 0 deletions docs/connection/db_connection/kafka/format_handling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -242,3 +242,56 @@ To serialize structured data into Avro format and write it back to a Kafka topic
# | 1|[02 02 02 08 76 6... (binary data)] |
# | 2|[02 04 02 08 76 6... (binary data)] |
# +---+------------------------------------+
XML Format Handling
-------------------

Handling XML data in Kafka involves parsing string representations of XML into structured Spark DataFrame format.

``DBReader``
~~~~~~~~~~~~

To process XML formatted data from Kafka, use the :obj:`XML.parse_column <onetl.file.format.xml.XML.parse_column>` method. This method allows you to convert a column containing XML strings directly into a structured Spark DataFrame using a specified schema.

.. code-block:: python
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from onetl.db import DBReader
from onetl.file.format import XML
from onetl.connection import Kafka
spark = SparkSession.builder.appName("KafkaXMLExample").getOrCreate()
kafka = Kafka(...)
xml = XML(row_tag="person")
reader = DBReader(
connection=kafka,
topic="topic_name",
)
df = reader.run()
df.show()
# +----+--------------------------------------------------------------------------------------------+----------+---------+------+-----------------------+-------------+
# |key |value |topic |partition|offset|timestamp |timestampType|
# +----+--------------------------------------------------------------------------------------------+----------+---------+------+-----------------------+-------------+
# |[31]|"<person><name>Alice</name><age>20</age></person>" |topicXML |0 |0 |2024-04-24 13:02:25.911|0 |
# |[32]|"<person><name>Bob</name><age>25</age></person>" |topicXML |0 |1 |2024-04-24 13:02:25.922|0 |
# +----+--------------------------------------------------------------------------------------------+----------+---------+------+-----------------------+-------------+
xml_schema = StructType(
[
StructField("name", StringType(), nullable=True),
StructField("age", IntegerType(), nullable=True),
]
)
parsed_xml_df = df.select(xml.parse_column("value", xml_schema))
parsed_xml_df.show()
# +-----------+
# |value |
# +-----------+
# |{Alice, 20}|
# |{Bob, 25} |
# +-----------+
2 changes: 1 addition & 1 deletion docs/file_df/file_formats/xml.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ XML
.. currentmodule:: onetl.file.format.xml

.. autoclass:: XML
:members: get_packages
:members: get_packages, parse_column
113 changes: 112 additions & 1 deletion onetl/file/format/xml.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
from onetl.hooks import slot, support_hooks

if TYPE_CHECKING:
from pyspark.sql import SparkSession
from pyspark.sql import Column, SparkSession
from pyspark.sql.types import StructType


PROHIBITED_OPTIONS = frozenset(
Expand Down Expand Up @@ -226,3 +227,113 @@ def check_if_supported(self, spark: SparkSession) -> None:
if log.isEnabledFor(logging.DEBUG):
log.debug("Missing Java class", exc_info=e, stack_info=True)
raise ValueError(msg) from e

def parse_column(self, column: str | Column, schema: StructType) -> Column:
"""
Parses an XML string column into a structured Spark SQL column using the ``from_xml`` function
provided by the `Databricks Spark XML library <https://github.com/databricks/spark-xml#pyspark-notes>`_
based on the provided schema.
.. note::
This method assumes that the ``spark-xml`` package is installed: :obj:`XML.get_packages <onetl.file.format.xml.XML.get_packages>`.
.. note::
This method parses each DataFrame row individually. Therefore, for a specific column, each row must contain exactly one occurrence of the ``rowTag`` specified. If your XML data includes a root tag that encapsulates multiple row tags, you can adjust the schema to use an ``ArrayType`` to keep all child elements under the single root.
.. code-block:: xml
<books>
<book><title>Book One</title><author>Author A</author></book>
<book><title>Book Two</title><author>Author B</author></book>
</books>
And the corresponding schema in Spark using an ``ArrayType``:
.. code-block:: python
from pyspark.sql.types import StructType, StructField, ArrayType, StringType
schema = StructType(
[
StructField(
"book",
ArrayType(
StructType(
[
StructField("title", StringType(), True),
StructField("author", StringType(), True),
]
)
),
True,
)
]
)
Parameters
----------
column : str | Column
The name of the column or the Column object containing XML strings to parse.
Returns
-------
Column
A new Column object with data parsed from XML string to the specified structured format.
Examples
--------
.. code-block:: python
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from onetl.file.format import XML
spark = SparkSession.builder.appName("XMLParsingExample").getOrCreate()
schema = StructType(
[
StructField("author", StringType(), nullable=True),
StructField("title", StringType(), nullable=True),
StructField("genre", StringType(), nullable=True),
StructField("price", IntegerType(), nullable=True),
]
)
xml_processor = XML(row_tag="book")
data = [
(
"<book><author>Austen, Jane</author><title>Pride and Prejudice</title><genre>romance</genre><price>19</price></book>",
)
]
df = spark.createDataFrame(data, ["xml_string"])
parsed_df = df.select(xml_processor.parse_column("xml_string", schema=schema))
parsed_df.show()
"""
from pyspark.sql import Column, SparkSession # noqa: WPS442

spark = SparkSession._instantiatedSession # noqa: WPS437
self.check_if_supported(spark)

from pyspark.sql.column import _to_java_column # noqa: WPS450
from pyspark.sql.functions import col

if isinstance(column, Column):
column_name, column = column._jc.toString(), column.cast("string") # noqa: WPS437
else:
column_name, column = column, col(column).cast("string")

java_column = _to_java_column(column)
java_schema = spark._jsparkSession.parseDataType(schema.json()) # noqa: WPS437
scala_options = spark._jvm.org.apache.spark.api.python.PythonUtils.toScalaMap( # noqa: WPS219, WPS437
self.dict(),
)
jc = spark._jvm.com.databricks.spark.xml.functions.from_xml( # noqa: WPS219, WPS437
java_column,
java_schema,
scala_options,
)
return Column(jc).alias(column_name)
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,21 @@
Do not test all the possible options and combinations, we are not testing Spark here.
"""

import datetime

import pytest

from onetl._util.spark import get_spark_version
from onetl.file import FileDFReader, FileDFWriter
from onetl.file.format import XML

try:
from pyspark.sql import Row
from pyspark.sql.functions import col

from tests.util.assert_df import assert_equal_df
except ImportError:
pytest.skip("Missing pandas", allow_module_level=True)
pytest.skip("Missing pandas or pyspark", allow_module_level=True)

pytestmark = [pytest.mark.local_fs, pytest.mark.file_df_connection, pytest.mark.connection, pytest.mark.xml]

Expand Down Expand Up @@ -166,3 +171,45 @@ def test_xml_reader_with_attributes(
assert read_df.count()
assert read_df.schema == expected_xml_attributes_df.schema
assert_equal_df(read_df, expected_xml_attributes_df, order_by="id")


@pytest.mark.parametrize(
"xml_input, expected_row",
[
(
"""<item>
<id>1</id>
<str_value>Alice</str_value>
<int_value>123</int_value>
<date_value>2021-01-01</date_value>
<datetime_value>2021-01-01T07:01:01Z</datetime_value>
<float_value>1.23</float_value>
</item>""",
Row(
xml_string=Row(
id=1,
str_value="Alice",
int_value=123,
date_value=datetime.date(2021, 1, 1),
datetime_value=datetime.datetime(2021, 1, 1, 7, 1, 1),
float_value=1.23,
),
),
),
],
ids=["basic-case"],
)
@pytest.mark.parametrize("column_type", [str, col])
def test_xml_parse_column(spark, xml_input: str, expected_row: Row, column_type, file_df_schema):
from onetl.file.format import XML

spark_version = get_spark_version(spark)
if spark_version.major < 3:
pytest.skip("XML files are supported on Spark 3.x only")

xml = XML(row_tag="item")
df = spark.createDataFrame([(xml_input,)], ["xml_string"])
parsed_df = df.select(xml.parse_column(column_type("xml_string"), schema=file_df_schema))
result_row = parsed_df.first()

assert result_row == expected_row

0 comments on commit 076d073

Please sign in to comment.