From 550ca5c8ede46d7134ea96836da4e112d72be7a7 Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Thu, 5 Oct 2023 18:46:14 +0300 Subject: [PATCH 01/14] [DOP-9645] - Add XML file format --- docs/changelog/next_release/162.feature.rst | 1 + docs/file_df/file_formats/index.rst | 1 + docs/file_df/file_formats/xml.rst | 9 + onetl/file/format/__init__.py | 1 + onetl/file/format/xml.py | 243 ++++++++++++++++++ .../connections/file_df_connections.py | 63 ++++- tests/fixtures/spark.py | 3 +- .../file_df_connection/generate_files.py | 57 ++++ .../with_attributes/file_with_attributes.xml | 1 + .../xml/with_compression/file.xml.gz | Bin 0 -> 294 bytes .../xml/without_compression/file.xml | 1 + .../test_xml_integration.py | 90 +++++++ .../test_format_unit/test_xml_unit.py | 79 ++++++ 13 files changed, 541 insertions(+), 8 deletions(-) create mode 100644 docs/changelog/next_release/162.feature.rst create mode 100644 docs/file_df/file_formats/xml.rst create mode 100644 onetl/file/format/xml.py create mode 100644 tests/resources/file_df_connection/xml/with_attributes/file_with_attributes.xml create mode 100644 tests/resources/file_df_connection/xml/with_compression/file.xml.gz create mode 100644 tests/resources/file_df_connection/xml/without_compression/file.xml create mode 100644 tests/tests_integration/test_file_format_integration/test_xml_integration.py create mode 100644 tests/tests_unit/test_file/test_format_unit/test_xml_unit.py diff --git a/docs/changelog/next_release/162.feature.rst b/docs/changelog/next_release/162.feature.rst new file mode 100644 index 000000000..f6663484e --- /dev/null +++ b/docs/changelog/next_release/162.feature.rst @@ -0,0 +1 @@ +Add ``XML`` file format support. diff --git a/docs/file_df/file_formats/index.rst b/docs/file_df/file_formats/index.rst index 3a39bc061..abaeee2d2 100644 --- a/docs/file_df/file_formats/index.rst +++ b/docs/file_df/file_formats/index.rst @@ -14,6 +14,7 @@ File Formats jsonline orc parquet + xml .. toctree:: :maxdepth: 1 diff --git a/docs/file_df/file_formats/xml.rst b/docs/file_df/file_formats/xml.rst new file mode 100644 index 000000000..187aa89a4 --- /dev/null +++ b/docs/file_df/file_formats/xml.rst @@ -0,0 +1,9 @@ +.. _xml-file-format: + +XML +===== + +.. currentmodule:: onetl.file.format.xml + +.. autoclass:: XML + :members: get_packages diff --git a/onetl/file/format/__init__.py b/onetl/file/format/__init__.py index 0c9d6b742..74475c8a9 100644 --- a/onetl/file/format/__init__.py +++ b/onetl/file/format/__init__.py @@ -20,3 +20,4 @@ from onetl.file.format.jsonline import JSONLine from onetl.file.format.orc import ORC from onetl.file.format.parquet import Parquet +from onetl.file.format.xml import XML diff --git a/onetl/file/format/xml.py b/onetl/file/format/xml.py new file mode 100644 index 000000000..feee20742 --- /dev/null +++ b/onetl/file/format/xml.py @@ -0,0 +1,243 @@ +# Copyright 2023 MTS (Mobile Telesystems) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, ClassVar + +from pydantic import Field + +from onetl._util.java import try_import_java_class +from onetl._util.scala import get_default_scala_version +from onetl._util.spark import get_spark_version +from onetl._util.version import Version +from onetl.exception import MISSING_JVM_CLASS_MSG +from onetl.file.format.file_format import ReadWriteFileFormat +from onetl.hooks import slot, support_hooks + +if TYPE_CHECKING: + from pyspark.sql import SparkSession + + +READ_OPTIONS = frozenset( + ( + "rowTag", + "samplingRatio", + "excludeAttribute", + "treatEmptyValuesAsNulls", + "mode", + "inferSchema", + "columnNameOfCorruptRecord", + "attributePrefix", + "valueTag", + "charset", + "ignoreSurroundingSpaces", + "wildcardColName", + "rowValidationXSDPath", + "ignoreNamespace", + "timestampFormat", + "dateFormat", + ), +) + +WRITE_OPTIONS = frozenset( + ( + "rowTag", + "rootTag", + "declaration", + "arrayElementName", + "nullValue", + "attributePrefix", + "valueTag", + "compression", + "timestampFormat", + "dateFormat", + ), +) + + +log = logging.getLogger(__name__) + + +@support_hooks +class XML(ReadWriteFileFormat): + """ + XML file format. |support_hooks| + + Based on `Databricks Spark XML `_ file format. + + Supports reading/writing files with ``.xml`` extension. + + .. versionadded:: 0.9.5 + + .. dropdown:: Version compatibility + + * Spark versions: 2.x.x - 3.4.x. + + .. warning:: + + Not all combinations of Spark version and package version are supported. + See `Maven index `_ + and `official documentation `_. + + Note that spark-xml is planned to become a part of Apache Spark 4.0, and this library will remain in maintenance mode for previous Spark versions. + + * Scala versions: + - Spark 2.0.x - 2.4.x: Scala 2.11 + - Spark 3.0.x - 3.1.x: Scala 2.12 + - Spark 3.2.x - 3.4.x: Scala 2.12, 2.13 + * Java versions: 8 - 20 + + + See documentation from link above. + + .. note :: + + You can pass any option to the constructor, even if it is not mentioned in this documentation. + **Option names should be in** ``camelCase``! + + The set of supported options depends on Spark version. See link above. + + Examples + -------- + Describe options how to read from/write to XML file with specific options: + + .. code:: python + + from onetl.file.format import XML + from pyspark.sql import SparkSession + + # Create Spark session with XML package loaded + maven_packages = XML.get_packages(spark_version="3.4.1") + spark = ( + SparkSession.builder.appName("spark-app-name") + .config("spark.jars.packages", ",".join(maven_packages)) + .getOrCreate() + ) + + xml = XML(row_tag="item", compression="gzip") + + """ + + name: ClassVar[str] = "xml" + + row_tag: str = Field(alias="rowTag") + + class Config: + known_options = READ_OPTIONS | WRITE_OPTIONS + extra = "allow" + + @slot + @classmethod + def get_packages( # noqa: WPS231 + cls, + spark_version: str, + scala_version: str | None = None, + package_version: str | None = None, + ) -> list[str]: + """ + Get package names to be downloaded by Spark. |support_hooks| + + .. warning:: + + Not all combinations of Spark version and package version are supported. + See `Maven index `_ + and `official documentation `_. + + Note that spark-xml is planned to become a part of Apache Spark 4.0, and this library will remain in maintenance mode for Spark 3.x versions. + + Parameters + ---------- + spark_version : str + Spark version in format ``major.minor.patch``. + + scala_version : str, optional + Scala version in format ``major.minor``. + + If ``None``, ``spark_version`` is used to determine Scala version. + + version: str, optional + Package version in format ``major.minor.patch``. Default is ``0.17.0`` with Spark 3.x.x and ``0.13.0`` with Spark 2.x.x. + + .. note:: + + It is not guaranteed that custom package versions are supported. + Tests are performed only for default version. + + Examples + -------- + + .. code:: python + + from onetl.file.format import XML + + XML.get_packages(spark_version="3.4.1") + XML.get_packages(spark_version="3.4.1", scala_version="2.12") + XML.get_packages( + spark_version="3.4.1", + scala_version="2.12", + package_version="0.17.0", + ) + + """ + + spark_ver = Version.parse(spark_version) + + # Ensure compatibility with Spark and Scala versions + if spark_ver < (3, 0): + if not package_version: + version = Version.parse("0.13.0") # Last version supporting Spark 2.x + else: + log.warning("Passed custom package version %r, it is not guaranteed to be supported", package_version) + version = Version.parse(package_version) + + if not scala_version: + scala_ver = Version.parse("2.11") + else: + scala_ver = Version.parse(scala_version) + if scala_ver < (2, 11) or scala_ver > (2, 11): + raise ValueError(f"For Spark 2.x, Scala version must be 2.11, got {scala_ver}") + else: + if not package_version: + version = Version.parse("0.17.0") + else: + log.warning("Passed custom package version %r, it is not guaranteed to be supported", package_version) + version = Version.parse(package_version) + + if not scala_version: + scala_ver = get_default_scala_version(spark_ver) + else: + scala_ver = Version.parse(scala_version) + if scala_ver < (2, 12) or scala_ver > (2, 13): + raise ValueError(f"For Spark 3.x, Scala version must be 2.12 or 2.13, got {scala_ver}") + + return [f"com.databricks:spark-xml_{scala_ver.digits(2)}:{version.digits(3)}"] + + @slot + def check_if_supported(self, spark: SparkSession) -> None: + java_class = "com.databricks.spark.xml.XmlReader" + + try: + try_import_java_class(spark, java_class) + except Exception as e: + spark_version = get_spark_version(spark) + msg = MISSING_JVM_CLASS_MSG.format( + java_class=java_class, + package_source=self.__class__.__name__, + args=f"spark_version='{spark_version}'", + ) + if log.isEnabledFor(logging.DEBUG): + log.debug("Missing Java class", exc_info=e, stack_info=True) + raise ValueError(msg) from e diff --git a/tests/fixtures/connections/file_df_connections.py b/tests/fixtures/connections/file_df_connections.py index 3a9bea44a..faf3bb3b6 100644 --- a/tests/fixtures/connections/file_df_connections.py +++ b/tests/fixtures/connections/file_df_connections.py @@ -56,13 +56,62 @@ def file_df_schema_str_value_last(): @pytest.fixture() def file_df_dataframe(spark, file_df_schema): data = [ - [1, "val1", 123, datetime.date(2021, 1, 1), datetime.datetime(2021, 1, 1, 1, 1, 1), 1.23], - [2, "val1", 234, datetime.date(2022, 2, 2), datetime.datetime(2022, 2, 2, 2, 2, 2), 2.34], - [3, "val2", 345, datetime.date(2023, 3, 3), datetime.datetime(2023, 3, 3, 3, 3, 3), 3.45], - [4, "val2", 456, datetime.date(2024, 4, 4), datetime.datetime(2024, 4, 4, 4, 4, 4), 4.56], - [5, "val3", 567, datetime.date(2025, 5, 5), datetime.datetime(2025, 5, 5, 5, 5, 5), 5.67], - [6, "val3", 678, datetime.date(2026, 6, 6), datetime.datetime(2026, 6, 6, 6, 6, 6), 6.78], - [7, "val3", 789, datetime.date(2027, 7, 7), datetime.datetime(2027, 7, 7, 7, 7, 7), 7.89], + [ + 1, + "val1", + 123, + datetime.date(2021, 1, 1), + datetime.datetime(2021, 1, 1, 1, 1, 1, tzinfo=datetime.timezone.utc), + 1.23, + ], + [ + 2, + "val1", + 234, + datetime.date(2022, 2, 2), + datetime.datetime(2022, 2, 2, 2, 2, 2, tzinfo=datetime.timezone.utc), + 2.34, + ], + [ + 3, + "val2", + 345, + datetime.date(2023, 3, 3), + datetime.datetime(2023, 3, 3, 3, 3, 3, tzinfo=datetime.timezone.utc), + 3.45, + ], + [ + 4, + "val2", + 456, + datetime.date(2024, 4, 4), + datetime.datetime(2024, 4, 4, 4, 4, 4, tzinfo=datetime.timezone.utc), + 4.56, + ], + [ + 5, + "val3", + 567, + datetime.date(2025, 5, 5), + datetime.datetime(2025, 5, 5, 5, 5, 5, tzinfo=datetime.timezone.utc), + 5.67, + ], + [ + 6, + "val3", + 678, + datetime.date(2026, 6, 6), + datetime.datetime(2026, 6, 6, 6, 6, 6, tzinfo=datetime.timezone.utc), + 6.78, + ], + [ + 7, + "val3", + 789, + datetime.date(2027, 7, 7), + datetime.datetime(2027, 7, 7, 7, 7, 7, tzinfo=datetime.timezone.utc), + 7.89, + ], ] return spark.createDataFrame(data, schema=file_df_schema) diff --git a/tests/fixtures/spark.py b/tests/fixtures/spark.py index a9799c7dd..357b6dd66 100644 --- a/tests/fixtures/spark.py +++ b/tests/fixtures/spark.py @@ -44,7 +44,7 @@ def maven_packages(): SparkS3, Teradata, ) - from onetl.file.format import Avro, Excel + from onetl.file.format import XML, Avro, Excel pyspark_version = get_pyspark_version() packages = ( @@ -54,6 +54,7 @@ def maven_packages(): + Oracle.get_packages() + Postgres.get_packages() + Teradata.get_packages() + + XML.get_packages(pyspark_version) ) with_greenplum = os.getenv("ONETL_DB_WITH_GREENPLUM", "false").lower() == "true" diff --git a/tests/resources/file_df_connection/generate_files.py b/tests/resources/file_df_connection/generate_files.py index 698c81ea7..21f3cf7fc 100755 --- a/tests/resources/file_df_connection/generate_files.py +++ b/tests/resources/file_df_connection/generate_files.py @@ -16,6 +16,7 @@ from pathlib import Path from tempfile import gettempdir from typing import TYPE_CHECKING, Any, Iterator, TextIO +from xml.etree import ElementTree # noqa: S405 from zipfile import ZipFile if TYPE_CHECKING: @@ -472,6 +473,61 @@ def save_as_xls(data: list[dict], path: Path) -> None: ) +def save_as_xml_plain(data: list[dict], path: Path) -> None: + path.mkdir(parents=True, exist_ok=True) + root = ElementTree.Element("root") + + for record in data: + item = ElementTree.SubElement(root, "item") + for key, value in record.items(): + child = ElementTree.SubElement(item, key) + child.text = str(value) + + tree = ElementTree.ElementTree(root) + tree.write(path / "file.xml") + + +def save_as_xml_with_attributes(data: list[dict], path: Path) -> None: + path.mkdir(parents=True, exist_ok=True) + root = ElementTree.Element("root") + + for record in data: + str_attributes = {key: str(value) for key, value in record.items()} + item = ElementTree.SubElement(root, "item", attrib=str_attributes) + for key, value in record.items(): + child = ElementTree.SubElement(item, key) + child.text = str(value) + + tree = ElementTree.ElementTree(root) + tree.write(str(path / "file_with_attributes.xml")) + + +def save_as_xml_gz(data: list[dict], path: Path) -> None: + path.mkdir(parents=True, exist_ok=True) + root = ElementTree.Element("root") + + for record in data: + item = ElementTree.SubElement(root, "item") + for key, value in record.items(): + child = ElementTree.SubElement(item, key) + child.text = str(value) + + ElementTree.ElementTree(root) + xml_string = ElementTree.tostring(root, encoding="utf-8") + + with gzip.open(path / "file.xml.gz", "wb", compresslevel=9) as f: + f.write(xml_string) + + +def save_as_xml(data: list[dict], path: Path) -> None: + root = path / "xml" + shutil.rmtree(root, ignore_errors=True) + + save_as_xml_plain(data, root / "without_compression") + save_as_xml_with_attributes(data, root / "with_attributes") + save_as_xml_gz(data, root / "with_compression") + + format_mapping = { "csv": save_as_csv, "json": save_as_json, @@ -481,6 +537,7 @@ def save_as_xls(data: list[dict], path: Path) -> None: "avro": save_as_avro, "xlsx": save_as_xlsx, "xls": save_as_xls, + "xml": save_as_xml, } diff --git a/tests/resources/file_df_connection/xml/with_attributes/file_with_attributes.xml b/tests/resources/file_df_connection/xml/with_attributes/file_with_attributes.xml new file mode 100644 index 000000000..9c170e560 --- /dev/null +++ b/tests/resources/file_df_connection/xml/with_attributes/file_with_attributes.xml @@ -0,0 +1 @@ +1val11232021-01-012021-01-01 01:01:01+00:001.232val12342022-02-022022-02-02 02:02:02+00:002.343val23452023-03-032023-03-03 03:03:03+00:003.454val24562024-04-042024-04-04 04:04:04+00:004.565val35672025-05-052025-05-05 05:05:05+00:005.676val36782026-06-062026-06-06 06:06:06+00:006.787val37892027-07-072027-07-07 07:07:07+00:007.89 \ No newline at end of file diff --git a/tests/resources/file_df_connection/xml/with_compression/file.xml.gz b/tests/resources/file_df_connection/xml/with_compression/file.xml.gz new file mode 100644 index 0000000000000000000000000000000000000000..9d255211fe04d2d530721df454470dd1e577d301 GIT binary patch literal 294 zcmV+>0onc^iwFpq)gEO6|7K}yWiEJaYyg$i?`py@6b0~C$#+S5o4+oEzRDE13Z4UYKHS=8K1)fm)AGxJ05~Rhuq_ad1E%IEH3rJ(v^bQ&WzwN{IJB{ZLvz5? z9HquU`1val11232021-01-012021-01-01 01:01:01+00:001.232val12342022-02-022022-02-02 02:02:02+00:002.343val23452023-03-032023-03-03 03:03:03+00:003.454val24562024-04-042024-04-04 04:04:04+00:004.565val35672025-05-052025-05-05 05:05:05+00:005.676val36782026-06-062026-06-06 06:06:06+00:006.787val37892027-07-072027-07-07 07:07:07+00:007.89 \ No newline at end of file diff --git a/tests/tests_integration/test_file_format_integration/test_xml_integration.py b/tests/tests_integration/test_file_format_integration/test_xml_integration.py new file mode 100644 index 000000000..acf58bf7c --- /dev/null +++ b/tests/tests_integration/test_file_format_integration/test_xml_integration.py @@ -0,0 +1,90 @@ +"""Integration tests for XML file format. + +Test only that options are passed to Spark in both FileDFReader & FileDFWriter. +Do not test all the possible options and combinations, we are not testing Spark here. +""" + +import pytest + +from onetl.file import FileDFReader, FileDFWriter +from onetl.file.format import XML + +try: + from tests.util.assert_df import assert_equal_df +except ImportError: + # pandas and spark can be missing if someone runs tests for file connections only + pass + +pytestmark = [pytest.mark.local_fs, pytest.mark.file_df_connection, pytest.mark.connection] + + +@pytest.mark.parametrize( + "path, options", + [ + ("without_compression", {"rowTag": "item", "timestampFormat": "yyyy-MM-dd HH:mm:ssXXX"}), + ("with_attributes", {"rowTag": "item", "timestampFormat": "yyyy-MM-dd HH:mm:ssXXX", "attributePrefix": "_"}), + ], + ids=["without_compression", "with_attributes"], +) +def test_xml_reader( + local_fs_file_df_connection_with_path_and_files, + file_df_dataframe, + path, + options, +): + """Reading XML files working as expected on any Spark, Python and Java versions""" + + local_fs, source_path, _ = local_fs_file_df_connection_with_path_and_files + df = file_df_dataframe + xml_root = source_path / "xml" / path + + reader = FileDFReader( + connection=local_fs, + format=XML.parse(options), + df_schema=df.schema, + source_path=xml_root, + ) + read_df = reader.run() + assert read_df.count() + assert read_df.schema == df.schema + assert_equal_df(read_df, df) + + +@pytest.mark.parametrize( + "options", + [ + {"rowTag": "item", "rootTag": "root"}, + {"rowTag": "item", "rootTag": "root", "compression": "gzip"}, + ], + ids=["without_compression", "with_compression"], +) +def test_xml_writer( + spark, + local_fs_file_df_connection_with_path, + file_df_dataframe, + options, +): + """Written files can be read by Spark""" + + file_df_connection, source_path = local_fs_file_df_connection_with_path + df = file_df_dataframe + xml_root = source_path / "xml" + + writer = FileDFWriter( + connection=file_df_connection, + format=XML.parse(options), + target_path=xml_root, + ) + writer.run(df) + + reader = FileDFReader( + connection=file_df_connection, + format=XML.parse(options), + source_path=xml_root, + df_schema=df.schema, + ) + read_df = reader.run() + + assert read_df.count() + assert read_df.schema == df.schema + assert_equal_df(read_df, df) diff --git a/tests/tests_unit/test_file/test_format_unit/test_xml_unit.py b/tests/tests_unit/test_file/test_format_unit/test_xml_unit.py new file mode 100644 index 000000000..ad65c7f35 --- /dev/null +++ b/tests/tests_unit/test_file/test_format_unit/test_xml_unit.py @@ -0,0 +1,79 @@ +import logging + +import pytest + +from onetl.file.format import XML + + +@pytest.mark.parametrize( + "spark_version, scala_version, package_version, expected_packages", + [ + ("2.4.8", None, None, ["com.databricks:spark-xml_2.11:0.13.0"]), + ("2.4.8", "2.11", "0.13.0", ["com.databricks:spark-xml_2.11:0.13.0"]), + ("2.3.4", None, None, ["com.databricks:spark-xml_2.11:0.13.0"]), + ("2.3.4", "2.11", "0.12.0", ["com.databricks:spark-xml_2.11:0.12.0"]), + ("2.2.3", None, None, ["com.databricks:spark-xml_2.11:0.13.0"]), + ("2.2.3", "2.11", "0.11.0", ["com.databricks:spark-xml_2.11:0.11.0"]), + ("3.2.4", None, None, ["com.databricks:spark-xml_2.12:0.17.0"]), + ("3.4.1", "2.12", "0.18.0", ["com.databricks:spark-xml_2.12:0.18.0"]), + ("3.0.0", None, None, ["com.databricks:spark-xml_2.12:0.17.0"]), + ("3.0.0", "2.12", "0.17.0", ["com.databricks:spark-xml_2.12:0.17.0"]), + ("3.1.2", None, None, ["com.databricks:spark-xml_2.12:0.17.0"]), + ("3.1.2", "2.12", "0.16.0", ["com.databricks:spark-xml_2.12:0.16.0"]), + ("3.2.0", "2.12", None, ["com.databricks:spark-xml_2.12:0.17.0"]), + ("3.2.0", "2.12", "0.15.0", ["com.databricks:spark-xml_2.12:0.15.0"]), + ("3.3.0", None, "0.16.0", ["com.databricks:spark-xml_2.12:0.16.0"]), + ("3.3.0", "2.12", None, ["com.databricks:spark-xml_2.12:0.17.0"]), + ], +) +def test_xml_get_packages(spark_version, scala_version, package_version, expected_packages): + result = XML.get_packages( + spark_version=spark_version, + scala_version=scala_version, + package_version=package_version, + ) + assert result == expected_packages + + +@pytest.mark.parametrize( + "known_option", + [ + "samplingRatio", + "excludeAttribute", + "treatEmptyValuesAsNulls", + "mode", + "inferSchema", + "columnNameOfCorruptRecord", + "attributePrefix", + "valueTag", + "charset", + "ignoreSurroundingSpaces", + "wildcardColName", + "rowValidationXSDPath", + "ignoreNamespace", + "timestampFormat", + "dateFormat", + "rootTag", + "declaration", + "arrayElementName", + "nullValue", + "compression", + ], +) +def test_xml_options_known(known_option): + xml = XML.parse({known_option: "value", "row_tag": "item"}) + assert getattr(xml, known_option) == "value" + + +def test_xml_options_unknown(caplog): + with caplog.at_level(logging.WARNING): + xml = XML(row_tag="item", unknownOption="abc") + assert xml.unknownOption == "abc" + assert "Options ['unknownOption'] are not known by XML, are you sure they are valid?" in caplog.text + + +@pytest.mark.local_fs +def test_xml_missing_package(spark_no_packages): + msg = "Cannot import Java class 'com.databricks.spark.xml.XmlReader'" + with pytest.raises(ValueError, match=msg): + XML(row_tag="item").check_if_supported(spark_no_packages) From a3e68ff0741a64738fc25ad06e257d30bb758e61 Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Thu, 5 Oct 2023 18:51:33 +0300 Subject: [PATCH 02/14] [DOP-9645] - change .rst index --- docs/changelog/next_release/{162.feature.rst => 163.feature.rst} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename docs/changelog/next_release/{162.feature.rst => 163.feature.rst} (100%) diff --git a/docs/changelog/next_release/162.feature.rst b/docs/changelog/next_release/163.feature.rst similarity index 100% rename from docs/changelog/next_release/162.feature.rst rename to docs/changelog/next_release/163.feature.rst From e56fb22dd195abcebe9cd87de934d1a884c55b2e Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Fri, 6 Oct 2023 11:54:19 +0300 Subject: [PATCH 03/14] [DOP-9645] - restrict spark 2.x --- onetl/file/format/xml.py | 49 +++++++------------ .../test_xml_integration.py | 11 ++++- .../test_format_unit/test_xml_unit.py | 26 +++++++--- 3 files changed, 49 insertions(+), 37 deletions(-) diff --git a/onetl/file/format/xml.py b/onetl/file/format/xml.py index feee20742..fcbeab04b 100644 --- a/onetl/file/format/xml.py +++ b/onetl/file/format/xml.py @@ -84,7 +84,7 @@ class XML(ReadWriteFileFormat): .. dropdown:: Version compatibility - * Spark versions: 2.x.x - 3.4.x. + * Spark versions: 3.x.x - 3.4.x. .. warning:: @@ -92,10 +92,9 @@ class XML(ReadWriteFileFormat): See `Maven index `_ and `official documentation `_. - Note that spark-xml is planned to become a part of Apache Spark 4.0, and this library will remain in maintenance mode for previous Spark versions. + Note that spark-xml is planned to become a part of Apache Spark 4.0, and this library will remain in maintenance mode for Spark 3.x versions. * Scala versions: - - Spark 2.0.x - 2.4.x: Scala 2.11 - Spark 3.0.x - 3.1.x: Scala 2.12 - Spark 3.2.x - 3.4.x: Scala 2.12, 2.13 * Java versions: 8 - 20 @@ -127,7 +126,7 @@ class XML(ReadWriteFileFormat): .getOrCreate() ) - xml = XML(row_tag="item", compression="gzip") + xml = XML(row_tag="item") """ @@ -169,7 +168,11 @@ def get_packages( # noqa: WPS231 If ``None``, ``spark_version`` is used to determine Scala version. version: str, optional - Package version in format ``major.minor.patch``. Default is ``0.17.0`` with Spark 3.x.x and ``0.13.0`` with Spark 2.x.x. + Package version in format ``major.minor.patch``. Default is ``0.17.0``. + + .. warning:: + + Version ``0.13`` and below are not supported. .. note:: @@ -193,35 +196,21 @@ def get_packages( # noqa: WPS231 """ + if package_version: + version = Version.parse(package_version) + log.warning("Passed custom package version %r, it is not guaranteed to be supported", package_version) + else: + version = Version.parse("0.17.0") + spark_ver = Version.parse(spark_version) + scala_ver = Version.parse(scala_version) if scala_version else get_default_scala_version(spark_ver) # Ensure compatibility with Spark and Scala versions if spark_ver < (3, 0): - if not package_version: - version = Version.parse("0.13.0") # Last version supporting Spark 2.x - else: - log.warning("Passed custom package version %r, it is not guaranteed to be supported", package_version) - version = Version.parse(package_version) - - if not scala_version: - scala_ver = Version.parse("2.11") - else: - scala_ver = Version.parse(scala_version) - if scala_ver < (2, 11) or scala_ver > (2, 11): - raise ValueError(f"For Spark 2.x, Scala version must be 2.11, got {scala_ver}") - else: - if not package_version: - version = Version.parse("0.17.0") - else: - log.warning("Passed custom package version %r, it is not guaranteed to be supported", package_version) - version = Version.parse(package_version) - - if not scala_version: - scala_ver = get_default_scala_version(spark_ver) - else: - scala_ver = Version.parse(scala_version) - if scala_ver < (2, 12) or scala_ver > (2, 13): - raise ValueError(f"For Spark 3.x, Scala version must be 2.12 or 2.13, got {scala_ver}") + raise ValueError(f"Spark version must be 3.x, got {spark_ver}") + + if scala_ver < (2, 12) or scala_ver > (2, 13): + raise ValueError(f"Scala version must be 2.12 or 2.13, got {scala_ver}") return [f"com.databricks:spark-xml_{scala_ver.digits(2)}:{version.digits(3)}"] diff --git a/tests/tests_integration/test_file_format_integration/test_xml_integration.py b/tests/tests_integration/test_file_format_integration/test_xml_integration.py index acf58bf7c..516cd056f 100644 --- a/tests/tests_integration/test_file_format_integration/test_xml_integration.py +++ b/tests/tests_integration/test_file_format_integration/test_xml_integration.py @@ -6,6 +6,7 @@ import pytest +from onetl._util.spark import get_spark_version from onetl.file import FileDFReader, FileDFWriter from onetl.file.format import XML @@ -22,17 +23,22 @@ "path, options", [ ("without_compression", {"rowTag": "item", "timestampFormat": "yyyy-MM-dd HH:mm:ssXXX"}), + ("with_compression", {"rowTag": "item", "timestampFormat": "yyyy-MM-dd HH:mm:ssXXX", "compression": "gzip"}), ("with_attributes", {"rowTag": "item", "timestampFormat": "yyyy-MM-dd HH:mm:ssXXX", "attributePrefix": "_"}), ], - ids=["without_compression", "with_attributes"], + ids=["without_compression", "with_compression", "with_attributes"], ) def test_xml_reader( + spark, local_fs_file_df_connection_with_path_and_files, file_df_dataframe, path, options, ): """Reading XML files working as expected on any Spark, Python and Java versions""" + spark_version = get_spark_version(spark) + if spark_version < (3, 0): + pytest.skip("XML files are supported on Spark 3.x only") local_fs, source_path, _ = local_fs_file_df_connection_with_path_and_files df = file_df_dataframe @@ -65,6 +71,9 @@ def test_xml_writer( options, ): """Written files can be read by Spark""" + spark_version = get_spark_version(spark) + if spark_version < (3, 0): + pytest.skip("XML files are supported on Spark 3.x only") file_df_connection, source_path = local_fs_file_df_connection_with_path df = file_df_dataframe diff --git a/tests/tests_unit/test_file/test_format_unit/test_xml_unit.py b/tests/tests_unit/test_file/test_format_unit/test_xml_unit.py index ad65c7f35..143af951d 100644 --- a/tests/tests_unit/test_file/test_format_unit/test_xml_unit.py +++ b/tests/tests_unit/test_file/test_format_unit/test_xml_unit.py @@ -8,12 +8,6 @@ @pytest.mark.parametrize( "spark_version, scala_version, package_version, expected_packages", [ - ("2.4.8", None, None, ["com.databricks:spark-xml_2.11:0.13.0"]), - ("2.4.8", "2.11", "0.13.0", ["com.databricks:spark-xml_2.11:0.13.0"]), - ("2.3.4", None, None, ["com.databricks:spark-xml_2.11:0.13.0"]), - ("2.3.4", "2.11", "0.12.0", ["com.databricks:spark-xml_2.11:0.12.0"]), - ("2.2.3", None, None, ["com.databricks:spark-xml_2.11:0.13.0"]), - ("2.2.3", "2.11", "0.11.0", ["com.databricks:spark-xml_2.11:0.11.0"]), ("3.2.4", None, None, ["com.databricks:spark-xml_2.12:0.17.0"]), ("3.4.1", "2.12", "0.18.0", ["com.databricks:spark-xml_2.12:0.18.0"]), ("3.0.0", None, None, ["com.databricks:spark-xml_2.12:0.17.0"]), @@ -35,6 +29,26 @@ def test_xml_get_packages(spark_version, scala_version, package_version, expecte assert result == expected_packages +@pytest.mark.parametrize( + "spark_version, scala_version, package_version", + [ + ("2.4.8", None, None), + ("2.4.8", "2.11", "0.13.0"), + ("2.3.4", None, None), + ("2.3.4", "2.11", "0.12.0"), + ("2.2.3", None, None), + ("2.2.3", "2.11", "0.11.0"), + ], +) +def test_xml_get_packages_restriction_for_spark_2x(spark_version, scala_version, package_version): + with pytest.raises(ValueError, match=r"Spark version must be 3.x, got \d+\.\d+"): + XML.get_packages( + spark_version=spark_version, + scala_version=scala_version, + package_version=package_version, + ) + + @pytest.mark.parametrize( "known_option", [ From 0259aeb42a2b06b3d5a497111b30dffff02dce0e Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Fri, 6 Oct 2023 12:04:23 +0300 Subject: [PATCH 04/14] [DOP-9645] - update creating spark session --- tests/fixtures/spark.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/fixtures/spark.py b/tests/fixtures/spark.py index 357b6dd66..452c0f978 100644 --- a/tests/fixtures/spark.py +++ b/tests/fixtures/spark.py @@ -54,7 +54,6 @@ def maven_packages(): + Oracle.get_packages() + Postgres.get_packages() + Teradata.get_packages() - + XML.get_packages(pyspark_version) ) with_greenplum = os.getenv("ONETL_DB_WITH_GREENPLUM", "false").lower() == "true" @@ -72,6 +71,9 @@ def maven_packages(): # There is no SparkS3 connector for Spark less than 3 packages.extend(SparkS3.get_packages(spark_version=pyspark_version)) + # There is no XML files support for Spark less than 3 + packages.extend(XML.get_packages(pyspark_version)) + # There is no MongoDB connector for Spark less than 3.2 packages.extend(MongoDB.get_packages(spark_version=pyspark_version)) From 03cd0e8707affe53a0a8d0226f8913e1ffb723c9 Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Fri, 6 Oct 2023 14:37:40 +0300 Subject: [PATCH 05/14] [DOP-9645] - update XML format tests --- onetl/file/format/xml.py | 30 +++++--------- .../test_xml_integration.py | 40 +++++++++++++++++++ .../test_format_unit/test_xml_unit.py | 8 ++++ 3 files changed, 59 insertions(+), 19 deletions(-) diff --git a/onetl/file/format/xml.py b/onetl/file/format/xml.py index fcbeab04b..d94a93cd2 100644 --- a/onetl/file/format/xml.py +++ b/onetl/file/format/xml.py @@ -31,6 +31,14 @@ from pyspark.sql import SparkSession +PROHIBITED_OPTIONS = frozenset( + ( + # filled by onETL classes + "path", + ), +) + + READ_OPTIONS = frozenset( ( "rowTag", @@ -84,19 +92,10 @@ class XML(ReadWriteFileFormat): .. dropdown:: Version compatibility - * Spark versions: 3.x.x - 3.4.x. + * Spark versions: 3.2.x - 3.4.x. - .. warning:: + * Scala versions: 2.12 - 2.13 - Not all combinations of Spark version and package version are supported. - See `Maven index `_ - and `official documentation `_. - - Note that spark-xml is planned to become a part of Apache Spark 4.0, and this library will remain in maintenance mode for Spark 3.x versions. - - * Scala versions: - - Spark 3.0.x - 3.1.x: Scala 2.12 - - Spark 3.2.x - 3.4.x: Scala 2.12, 2.13 * Java versions: 8 - 20 @@ -136,6 +135,7 @@ class XML(ReadWriteFileFormat): class Config: known_options = READ_OPTIONS | WRITE_OPTIONS + prohibited_options = PROHIBITED_OPTIONS extra = "allow" @slot @@ -149,14 +149,6 @@ def get_packages( # noqa: WPS231 """ Get package names to be downloaded by Spark. |support_hooks| - .. warning:: - - Not all combinations of Spark version and package version are supported. - See `Maven index `_ - and `official documentation `_. - - Note that spark-xml is planned to become a part of Apache Spark 4.0, and this library will remain in maintenance mode for Spark 3.x versions. - Parameters ---------- spark_version : str diff --git a/tests/tests_integration/test_file_format_integration/test_xml_integration.py b/tests/tests_integration/test_file_format_integration/test_xml_integration.py index 516cd056f..257ac98a8 100644 --- a/tests/tests_integration/test_file_format_integration/test_xml_integration.py +++ b/tests/tests_integration/test_file_format_integration/test_xml_integration.py @@ -19,6 +19,13 @@ pytestmark = [pytest.mark.local_fs, pytest.mark.file_df_connection, pytest.mark.connection] +@pytest.fixture() +def expected_xml_attributes_df(file_df_dataframe): + col_names = file_df_dataframe.columns + exprs = [f"{col} as _{col}" for col in col_names] + col_names + return file_df_dataframe.selectExpr(*exprs) + + @pytest.mark.parametrize( "path, options", [ @@ -97,3 +104,36 @@ def test_xml_writer( assert read_df.count() assert read_df.schema == df.schema assert_equal_df(read_df, df) + + +@pytest.mark.parametrize( + "options", + [ + {"rowTag": "item", "timestampFormat": "yyyy-MM-dd HH:mm:ssXXX", "attributePrefix": "_"}, + ], + ids=["read_attributes"], +) +def test_xml_reader_with_attributes( + spark, + local_fs_file_df_connection_with_path_and_files, + expected_xml_attributes_df, + options, +): + """Reading XML files with attributes works as expected""" + spark_version = get_spark_version(spark) + if spark_version < (3, 0): + pytest.skip("XML files are supported on Spark 3.x only") + + local_fs, source_path, _ = local_fs_file_df_connection_with_path_and_files + xml_root = source_path / "xml" / "with_attributes" + + reader = FileDFReader( + connection=local_fs, + format=XML.parse(options), + df_schema=expected_xml_attributes_df.schema, + source_path=xml_root, + ) + read_df = reader.run() + assert read_df.count() + assert read_df.schema == expected_xml_attributes_df.schema + assert_equal_df(read_df, expected_xml_attributes_df) diff --git a/tests/tests_unit/test_file/test_format_unit/test_xml_unit.py b/tests/tests_unit/test_file/test_format_unit/test_xml_unit.py index 143af951d..1434e0eab 100644 --- a/tests/tests_unit/test_file/test_format_unit/test_xml_unit.py +++ b/tests/tests_unit/test_file/test_format_unit/test_xml_unit.py @@ -16,6 +16,8 @@ ("3.1.2", "2.12", "0.16.0", ["com.databricks:spark-xml_2.12:0.16.0"]), ("3.2.0", "2.12", None, ["com.databricks:spark-xml_2.12:0.17.0"]), ("3.2.0", "2.12", "0.15.0", ["com.databricks:spark-xml_2.12:0.15.0"]), + ("3.2.4", "2.13", None, ["com.databricks:spark-xml_2.13:0.17.0"]), + ("3.4.1", "2.13", "0.18.0", ["com.databricks:spark-xml_2.13:0.18.0"]), ("3.3.0", None, "0.16.0", ["com.databricks:spark-xml_2.12:0.16.0"]), ("3.3.0", "2.12", None, ["com.databricks:spark-xml_2.12:0.17.0"]), ], @@ -79,6 +81,12 @@ def test_xml_options_known(known_option): assert getattr(xml, known_option) == "value" +def test_xml_option_path_error(caplog): + msg = r"Options \['path'\] are not allowed to use in a XML" + with pytest.raises(ValueError, match=msg): + XML(row_tag="item", path="/path") + + def test_xml_options_unknown(caplog): with caplog.at_level(logging.WARNING): xml = XML(row_tag="item", unknownOption="abc") From 083d8e66b47657348c0fa5f7b9fe70b7ef0b5fe5 Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Fri, 6 Oct 2023 15:46:12 +0300 Subject: [PATCH 06/14] [DOP-9645] - add tests for scala and package versions --- onetl/file/format/xml.py | 2 ++ .../test_format_unit/test_xml_unit.py | 32 +++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/onetl/file/format/xml.py b/onetl/file/format/xml.py index d94a93cd2..a86da3afe 100644 --- a/onetl/file/format/xml.py +++ b/onetl/file/format/xml.py @@ -190,6 +190,8 @@ def get_packages( # noqa: WPS231 if package_version: version = Version.parse(package_version) + if version <= Version.parse(version): + raise ValueError(f"Package version must be above 0.13, got {version}") log.warning("Passed custom package version %r, it is not guaranteed to be supported", package_version) else: version = Version.parse("0.17.0") diff --git a/tests/tests_unit/test_file/test_format_unit/test_xml_unit.py b/tests/tests_unit/test_file/test_format_unit/test_xml_unit.py index 1434e0eab..18400b25c 100644 --- a/tests/tests_unit/test_file/test_format_unit/test_xml_unit.py +++ b/tests/tests_unit/test_file/test_format_unit/test_xml_unit.py @@ -51,6 +51,38 @@ def test_xml_get_packages_restriction_for_spark_2x(spark_version, scala_version, ) +@pytest.mark.parametrize( + "spark_version, scala_version, package_version", + [ + ("3.2.4", "2.11", None), + ("3.4.1", "2.14", None), + ], +) +def test_xml_get_packages_scala_version_error(spark_version, scala_version, package_version): + with pytest.raises(ValueError, match=r"Scala version must be 2.12 or 2.13, got \d+\.\d+"): + XML.get_packages( + spark_version=spark_version, + scala_version=scala_version, + package_version=package_version, + ) + + +@pytest.mark.parametrize( + "spark_version, scala_version, package_version", + [ + ("3.2.4", "2.12", "0.13.0"), + ("3.4.1", "2.12", "0.10.0"), + ], +) +def test_xml_get_packages_package_version_error(spark_version, scala_version, package_version): + with pytest.raises(ValueError, match=r"Package version must be above 0.13, got \d+\.\d+\.\d+"): + XML.get_packages( + spark_version=spark_version, + scala_version=scala_version, + package_version=package_version, + ) + + @pytest.mark.parametrize( "known_option", [ From e3f11f91104013500c4c624a859861ec03d440fc Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Fri, 6 Oct 2023 15:55:04 +0300 Subject: [PATCH 07/14] [DOP-9645] - corrected version condition --- onetl/file/format/xml.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/onetl/file/format/xml.py b/onetl/file/format/xml.py index a86da3afe..9ac83bbb1 100644 --- a/onetl/file/format/xml.py +++ b/onetl/file/format/xml.py @@ -190,7 +190,7 @@ def get_packages( # noqa: WPS231 if package_version: version = Version.parse(package_version) - if version <= Version.parse(version): + if version < (0, 14): raise ValueError(f"Package version must be above 0.13, got {version}") log.warning("Passed custom package version %r, it is not guaranteed to be supported", package_version) else: From b39a8adacaa3d1fe73dee58e0f05713586bf539b Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Fri, 6 Oct 2023 15:55:04 +0300 Subject: [PATCH 08/14] [DOP-9645] - corrected version condition --- onetl/file/format/xml.py | 2 +- tests/tests_unit/test_file/test_format_unit/test_xml_unit.py | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/onetl/file/format/xml.py b/onetl/file/format/xml.py index a86da3afe..9ac83bbb1 100644 --- a/onetl/file/format/xml.py +++ b/onetl/file/format/xml.py @@ -190,7 +190,7 @@ def get_packages( # noqa: WPS231 if package_version: version = Version.parse(package_version) - if version <= Version.parse(version): + if version < (0, 14): raise ValueError(f"Package version must be above 0.13, got {version}") log.warning("Passed custom package version %r, it is not guaranteed to be supported", package_version) else: diff --git a/tests/tests_unit/test_file/test_format_unit/test_xml_unit.py b/tests/tests_unit/test_file/test_format_unit/test_xml_unit.py index 18400b25c..17d9dac2a 100644 --- a/tests/tests_unit/test_file/test_format_unit/test_xml_unit.py +++ b/tests/tests_unit/test_file/test_format_unit/test_xml_unit.py @@ -35,11 +35,7 @@ def test_xml_get_packages(spark_version, scala_version, package_version, expecte "spark_version, scala_version, package_version", [ ("2.4.8", None, None), - ("2.4.8", "2.11", "0.13.0"), ("2.3.4", None, None), - ("2.3.4", "2.11", "0.12.0"), - ("2.2.3", None, None), - ("2.2.3", "2.11", "0.11.0"), ], ) def test_xml_get_packages_restriction_for_spark_2x(spark_version, scala_version, package_version): From f9e12901fae9240329e5937510edd39da9db84f4 Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Fri, 6 Oct 2023 17:20:52 +0300 Subject: [PATCH 09/14] [DOP-9645] - add XML inferSchema test --- .../test_xml_integration.py | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/tests/tests_integration/test_file_format_integration/test_xml_integration.py b/tests/tests_integration/test_file_format_integration/test_xml_integration.py index 257ac98a8..39f3245f9 100644 --- a/tests/tests_integration/test_file_format_integration/test_xml_integration.py +++ b/tests/tests_integration/test_file_format_integration/test_xml_integration.py @@ -63,6 +63,36 @@ def test_xml_reader( assert_equal_df(read_df, df) +def test_xml_reader_with_infer_schema( + spark, + local_fs_file_df_connection_with_path_and_files, + expected_xml_attributes_df, + file_df_dataframe, +): + """Reading XML files with inferSchema=True working as expected on any Spark, Python and Java versions""" + spark_version = get_spark_version(spark) + if spark_version < (3, 0): + pytest.skip("XML files are supported on Spark 3.x only") + + file_df_connection, source_path, _ = local_fs_file_df_connection_with_path_and_files + df = file_df_dataframe + xml_root = source_path / "xml" / "with_attributes" + + reader = FileDFReader( + connection=file_df_connection, + format=XML(rowTag="item", inferSchema=True, timestampFormat="yyyy-MM-dd HH:mm:ssXXX"), + source_path=xml_root, + ) + read_df = reader.run() + + assert read_df.count() + assert read_df.schema != df.schema + assert set(read_df.columns) == set( + expected_xml_attributes_df.columns, + ) # "DataFrames have different column types: StructField('id', IntegerType(), True), StructField('id', LongType(), True), etc." + assert_equal_df(read_df, expected_xml_attributes_df) + + @pytest.mark.parametrize( "options", [ From 618786def891344e077ff655fff04945cebcf1a1 Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Mon, 9 Oct 2023 10:05:53 +0300 Subject: [PATCH 10/14] [DOP-9645] - add timestamp info in class docs --- onetl/file/format/xml.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/onetl/file/format/xml.py b/onetl/file/format/xml.py index 9ac83bbb1..0f9a770d4 100644 --- a/onetl/file/format/xml.py +++ b/onetl/file/format/xml.py @@ -108,6 +108,8 @@ class XML(ReadWriteFileFormat): The set of supported options depends on Spark version. See link above. + **Important**: When **reading** files with timestamps, it may be necessary to specify the ``timestampFormat`` option to ensure dates are parsed correctly. Without it, date parsing may return ``null`` values. Example: ``timestampFormat="yyyy-MM-dd HH:mm:ssXXX"``. + Examples -------- Describe options how to read from/write to XML file with specific options: From cef7f1975fcc49b3687e2eeec662f3e14de9ecd2 Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Mon, 9 Oct 2023 11:34:47 +0300 Subject: [PATCH 11/14] [DOP-9645] - update tests, documentation --- onetl/file/format/xml.py | 8 +++++++- .../file_df_connection/generate_files.py | 19 ++++++++++++++---- .../with_attributes/file_with_attributes.xml | 2 +- .../xml/with_compression/file.xml.gz | Bin 294 -> 294 bytes .../xml/without_compression/file.xml | 2 +- .../test_xml_integration.py | 10 ++++----- 6 files changed, 29 insertions(+), 12 deletions(-) diff --git a/onetl/file/format/xml.py b/onetl/file/format/xml.py index 0f9a770d4..df94b24f1 100644 --- a/onetl/file/format/xml.py +++ b/onetl/file/format/xml.py @@ -108,7 +108,11 @@ class XML(ReadWriteFileFormat): The set of supported options depends on Spark version. See link above. - **Important**: When **reading** files with timestamps, it may be necessary to specify the ``timestampFormat`` option to ensure dates are parsed correctly. Without it, date parsing may return ``null`` values. Example: ``timestampFormat="yyyy-MM-dd HH:mm:ssXXX"``. + .. warning:: + + When interacting with files with timestamps, it may be necessary to specify the ``timestampFormat`` option to + ensure that dates are parsed correctly. Without it, date parsing may return ``null`` values. + Example: ``timestampFormat="yyyy-MM-ddTHH:mm:ss.SSSXXX"``. Examples -------- @@ -134,6 +138,8 @@ class XML(ReadWriteFileFormat): name: ClassVar[str] = "xml" row_tag: str = Field(alias="rowTag") + # Unable to use default timestamp_format due to a source code bug causing an UnsupportedTemporalTypeException (Unsupported field: ). + # timestamp_format: str = Field(default="yyyy-MM-ddTHH:mm:ss.SSSXXX", alias="timestampFormat") class Config: known_options = READ_OPTIONS | WRITE_OPTIONS diff --git a/tests/resources/file_df_connection/generate_files.py b/tests/resources/file_df_connection/generate_files.py index 21f3cf7fc..2417cb15d 100755 --- a/tests/resources/file_df_connection/generate_files.py +++ b/tests/resources/file_df_connection/generate_files.py @@ -481,7 +481,10 @@ def save_as_xml_plain(data: list[dict], path: Path) -> None: item = ElementTree.SubElement(root, "item") for key, value in record.items(): child = ElementTree.SubElement(item, key) - child.text = str(value) + if isinstance(value, datetime): + child.text = value.isoformat() + else: + child.text = str(value) tree = ElementTree.ElementTree(root) tree.write(path / "file.xml") @@ -492,11 +495,16 @@ def save_as_xml_with_attributes(data: list[dict], path: Path) -> None: root = ElementTree.Element("root") for record in data: - str_attributes = {key: str(value) for key, value in record.items()} + str_attributes = { + key: value.isoformat() if isinstance(value, datetime) else str(value) for key, value in record.items() + } item = ElementTree.SubElement(root, "item", attrib=str_attributes) for key, value in record.items(): child = ElementTree.SubElement(item, key) - child.text = str(value) + if isinstance(value, datetime): + child.text = value.isoformat() + else: + child.text = str(value) tree = ElementTree.ElementTree(root) tree.write(str(path / "file_with_attributes.xml")) @@ -510,7 +518,10 @@ def save_as_xml_gz(data: list[dict], path: Path) -> None: item = ElementTree.SubElement(root, "item") for key, value in record.items(): child = ElementTree.SubElement(item, key) - child.text = str(value) + if isinstance(value, datetime): + child.text = value.isoformat() + else: + child.text = str(value) ElementTree.ElementTree(root) xml_string = ElementTree.tostring(root, encoding="utf-8") diff --git a/tests/resources/file_df_connection/xml/with_attributes/file_with_attributes.xml b/tests/resources/file_df_connection/xml/with_attributes/file_with_attributes.xml index 9c170e560..f6fcbc7df 100644 --- a/tests/resources/file_df_connection/xml/with_attributes/file_with_attributes.xml +++ b/tests/resources/file_df_connection/xml/with_attributes/file_with_attributes.xml @@ -1 +1 @@ -1val11232021-01-012021-01-01 01:01:01+00:001.232val12342022-02-022022-02-02 02:02:02+00:002.343val23452023-03-032023-03-03 03:03:03+00:003.454val24562024-04-042024-04-04 04:04:04+00:004.565val35672025-05-052025-05-05 05:05:05+00:005.676val36782026-06-062026-06-06 06:06:06+00:006.787val37892027-07-072027-07-07 07:07:07+00:007.89 \ No newline at end of file +1val11232021-01-012021-01-01T01:01:01+00:001.232val12342022-02-022022-02-02T02:02:02+00:002.343val23452023-03-032023-03-03T03:03:03+00:003.454val24562024-04-042024-04-04T04:04:04+00:004.565val35672025-05-052025-05-05T05:05:05+00:005.676val36782026-06-062026-06-06T06:06:06+00:006.787val37892027-07-072027-07-07T07:07:07+00:007.89 \ No newline at end of file diff --git a/tests/resources/file_df_connection/xml/with_compression/file.xml.gz b/tests/resources/file_df_connection/xml/with_compression/file.xml.gz index 9d255211fe04d2d530721df454470dd1e577d301..aefbf24afd3c28d1bc2bc81979e4a425683aca82 100644 GIT binary patch literal 294 zcmV+>0onc^iwFpGxg%u)|7K}yWiEJaYyg$i?`py@6b0}H8cA>SS3~Gif15C|PY5}CnX>mx7%cMi?a42mEhhl)K zIZ7>naycyy#c`Q*=naQ_vCEgmgDl4qsxE4zyBQ8Cxrz701cLmt^fc4 literal 294 zcmV+>0onc^iwFpq)gEO6|7K}yWiEJaYyg$i?`py@6b0~C$#+S5o4+oEzRDE13Z4UYKHS=8K1)fm)AGxJ05~Rhuq_ad1E%IEH3rJ(v^bQ&WzwN{IJB{ZLvz5? z9HquU`1val11232021-01-012021-01-01 01:01:01+00:001.232val12342022-02-022022-02-02 02:02:02+00:002.343val23452023-03-032023-03-03 03:03:03+00:003.454val24562024-04-042024-04-04 04:04:04+00:004.565val35672025-05-052025-05-05 05:05:05+00:005.676val36782026-06-062026-06-06 06:06:06+00:006.787val37892027-07-072027-07-07 07:07:07+00:007.89 \ No newline at end of file +1val11232021-01-012021-01-01T01:01:01+00:001.232val12342022-02-022022-02-02T02:02:02+00:002.343val23452023-03-032023-03-03T03:03:03+00:003.454val24562024-04-042024-04-04T04:04:04+00:004.565val35672025-05-052025-05-05T05:05:05+00:005.676val36782026-06-062026-06-06T06:06:06+00:006.787val37892027-07-072027-07-07T07:07:07+00:007.89 \ No newline at end of file diff --git a/tests/tests_integration/test_file_format_integration/test_xml_integration.py b/tests/tests_integration/test_file_format_integration/test_xml_integration.py index 39f3245f9..d03a6f61d 100644 --- a/tests/tests_integration/test_file_format_integration/test_xml_integration.py +++ b/tests/tests_integration/test_file_format_integration/test_xml_integration.py @@ -29,9 +29,9 @@ def expected_xml_attributes_df(file_df_dataframe): @pytest.mark.parametrize( "path, options", [ - ("without_compression", {"rowTag": "item", "timestampFormat": "yyyy-MM-dd HH:mm:ssXXX"}), - ("with_compression", {"rowTag": "item", "timestampFormat": "yyyy-MM-dd HH:mm:ssXXX", "compression": "gzip"}), - ("with_attributes", {"rowTag": "item", "timestampFormat": "yyyy-MM-dd HH:mm:ssXXX", "attributePrefix": "_"}), + ("without_compression", {"rowTag": "item"}), + ("with_compression", {"rowTag": "item", "compression": "gzip"}), + ("with_attributes", {"rowTag": "item", "attributePrefix": "_"}), ], ids=["without_compression", "with_compression", "with_attributes"], ) @@ -80,7 +80,7 @@ def test_xml_reader_with_infer_schema( reader = FileDFReader( connection=file_df_connection, - format=XML(rowTag="item", inferSchema=True, timestampFormat="yyyy-MM-dd HH:mm:ssXXX"), + format=XML(rowTag="item", inferSchema=True), source_path=xml_root, ) read_df = reader.run() @@ -139,7 +139,7 @@ def test_xml_writer( @pytest.mark.parametrize( "options", [ - {"rowTag": "item", "timestampFormat": "yyyy-MM-dd HH:mm:ssXXX", "attributePrefix": "_"}, + {"rowTag": "item", "attributePrefix": "_"}, ], ids=["read_attributes"], ) From 91e53399c3169728e239b085df040d96d4ff0af1 Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Mon, 9 Oct 2023 12:19:56 +0300 Subject: [PATCH 12/14] [DOP-9645] - update documentation --- onetl/file/format/xml.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/onetl/file/format/xml.py b/onetl/file/format/xml.py index df94b24f1..152398c7d 100644 --- a/onetl/file/format/xml.py +++ b/onetl/file/format/xml.py @@ -112,7 +112,9 @@ class XML(ReadWriteFileFormat): When interacting with files with timestamps, it may be necessary to specify the ``timestampFormat`` option to ensure that dates are parsed correctly. Without it, date parsing may return ``null`` values. - Example: ``timestampFormat="yyyy-MM-ddTHH:mm:ss.SSSXXX"``. + Example: ``timestampFormat="yyyy-MM-ddTHH:mm:ss.SSSXXX"``. This warning applies to columns of all data types. + By default, ``mode=PERMISSIVE`` replaces improperly parsed values with ``null``. + Using ``mode=FAILFAST`` will throw an exception upon parsing errors. Examples -------- @@ -138,8 +140,6 @@ class XML(ReadWriteFileFormat): name: ClassVar[str] = "xml" row_tag: str = Field(alias="rowTag") - # Unable to use default timestamp_format due to a source code bug causing an UnsupportedTemporalTypeException (Unsupported field: ). - # timestamp_format: str = Field(default="yyyy-MM-ddTHH:mm:ss.SSSXXX", alias="timestampFormat") class Config: known_options = READ_OPTIONS | WRITE_OPTIONS From 514085ad621bb50edbec721f6f2f52e914d3d623 Mon Sep 17 00:00:00 2001 From: Maxim Liksakov <67663774+maxim-lixakov@users.noreply.github.com> Date: Mon, 9 Oct 2023 13:30:40 +0300 Subject: [PATCH 13/14] Update onetl/file/format/xml.py Co-authored-by: Maxim Martynov --- onetl/file/format/xml.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/onetl/file/format/xml.py b/onetl/file/format/xml.py index 152398c7d..6962c45f2 100644 --- a/onetl/file/format/xml.py +++ b/onetl/file/format/xml.py @@ -87,6 +87,10 @@ class XML(ReadWriteFileFormat): Based on `Databricks Spark XML `_ file format. Supports reading/writing files with ``.xml`` extension. + + .. warning:: + + Due to `bug `_ written files currently does not have ``.xml`` extension. .. versionadded:: 0.9.5 From 52d600f1d3d9a37e5031b80b99dac66dfaa9a796 Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Mon, 9 Oct 2023 13:40:01 +0300 Subject: [PATCH 14/14] [DOP-9645] - git update documentation --- onetl/file/format/xml.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/onetl/file/format/xml.py b/onetl/file/format/xml.py index 6962c45f2..3af9f2ed1 100644 --- a/onetl/file/format/xml.py +++ b/onetl/file/format/xml.py @@ -87,10 +87,10 @@ class XML(ReadWriteFileFormat): Based on `Databricks Spark XML `_ file format. Supports reading/writing files with ``.xml`` extension. - + .. warning:: - - Due to `bug `_ written files currently does not have ``.xml`` extension. + + Due to `bug `_ written files currently does not have ``.xml`` extension. .. versionadded:: 0.9.5 @@ -114,11 +114,10 @@ class XML(ReadWriteFileFormat): .. warning:: - When interacting with files with timestamps, it may be necessary to specify the ``timestampFormat`` option to - ensure that dates are parsed correctly. Without it, date parsing may return ``null`` values. - Example: ``timestampFormat="yyyy-MM-ddTHH:mm:ss.SSSXXX"``. This warning applies to columns of all data types. - By default, ``mode=PERMISSIVE`` replaces improperly parsed values with ``null``. - Using ``mode=FAILFAST`` will throw an exception upon parsing errors. + By default, reading is done using ``mode=PERMISSIVE`` which replaces columns with wrong data type or format with ``null`` values. + Be careful while parsing values like timestamps, they should match the ``timestampFormat`` option. + Using ``mode=FAILFAST`` will throw an exception instead of producing ``null`` values. + `Follow `_ Examples --------