From 8dc0d1faff605135931c872a249b70d6c17ed179 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Tue, 3 Sep 2024 12:56:16 +0000 Subject: [PATCH 01/28] [DOP-18743] Bump version --- onetl/VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/onetl/VERSION b/onetl/VERSION index ac454c6a..34a83616 100644 --- a/onetl/VERSION +++ b/onetl/VERSION @@ -1 +1 @@ -0.12.0 +0.12.1 From 496d7a7bf4fec343c25c37ad2a8249f38b97d795 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 23 Sep 2024 21:27:49 +0000 Subject: [PATCH 02/28] [pre-commit.ci] pre-commit autoupdate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/pycqa/bandit: 1.7.9 → 1.7.10](https://github.com/pycqa/bandit/compare/1.7.9...1.7.10) --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 8d9215d3..af00fcc4 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -114,7 +114,7 @@ repos: - black==24.4.2 - repo: https://github.com/pycqa/bandit - rev: 1.7.9 + rev: 1.7.10 hooks: - id: bandit args: From afeb7920490def055e2ff657fc5c2e09e8ca96ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Wed, 25 Sep 2024 12:33:47 +0000 Subject: [PATCH 03/28] Start enumeration from 1 while checking for active Hadoop namenode --- onetl/connection/file_connection/hdfs/connection.py | 2 +- .../file_df_connection/spark_hdfs/connection.py | 12 ++++-------- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/onetl/connection/file_connection/hdfs/connection.py b/onetl/connection/file_connection/hdfs/connection.py index 8cb6d1b5..4216f80f 100644 --- a/onetl/connection/file_connection/hdfs/connection.py +++ b/onetl/connection/file_connection/hdfs/connection.py @@ -378,7 +378,7 @@ def _get_active_namenode(self) -> str: raise RuntimeError(f"Cannot get list of namenodes for a cluster {self.cluster!r}") nodes_len = len(namenodes) - for i, namenode in enumerate(namenodes): + for i, namenode in enumerate(namenodes, start=1): log.debug("|%s| Trying namenode %r (%d of %d) ...", class_name, namenode, i, nodes_len) if self.Slots.is_namenode_active(namenode, self.cluster): log.info("|%s| Node %r is active!", class_name, namenode) diff --git a/onetl/connection/file_df_connection/spark_hdfs/connection.py b/onetl/connection/file_df_connection/spark_hdfs/connection.py index 36d20d4b..ab13b530 100644 --- a/onetl/connection/file_df_connection/spark_hdfs/connection.py +++ b/onetl/connection/file_df_connection/spark_hdfs/connection.py @@ -313,7 +313,7 @@ def _get_active_namenode(self) -> str: raise RuntimeError(f"Cannot get list of namenodes for a cluster {self.cluster!r}") nodes_len = len(namenodes) - for i, namenode in enumerate(namenodes): + for i, namenode in enumerate(namenodes, start=1): log.debug("|%s| Trying namenode %r (%d of %d) ...", class_name, namenode, i, nodes_len) if self.Slots.is_namenode_active(namenode, self.cluster): log.info("|%s| Node %r is active!", class_name, namenode) @@ -343,13 +343,9 @@ def _get_host(self) -> str: def _convert_to_url(self, path: PurePathProtocol) -> str: # "hdfs://namenode:8020/absolute/path" if host is set - if self._active_host: - host = self._active_host - else: - host = self._get_host() - # cache value to avoid getting active namenode for every path - self._active_host = host - return f"hdfs://{host}:{self.ipc_port}" + path.as_posix() + if not self._active_host: + self._active_host = self._get_host() + return f"hdfs://{self._active_host}:{self.ipc_port}" + path.as_posix() def _get_default_path(self): return RemotePath("/user") / getpass.getuser() From 7a76dcc463711d162458d52a3bfb3549a24db7cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Wed, 25 Sep 2024 12:34:16 +0000 Subject: [PATCH 04/28] Update Hive write example --- docs/connection/db_connection/hive/write.rst | 24 +++++++++----------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/docs/connection/db_connection/hive/write.rst b/docs/connection/db_connection/hive/write.rst index cd707fae..b4376930 100644 --- a/docs/connection/db_connection/hive/write.rst +++ b/docs/connection/db_connection/hive/write.rst @@ -19,18 +19,17 @@ Examples df = ... # data is here # Create dataframe with specific number of Spark partitions. - # Use the Hive partitioning columns to group the data. Create only 20 files per Hive partition. - # Also sort the data by column which most data is correlated with, reducing files size. + # Use the Hive partitioning columns to group the data. Create max 20 files per Hive partition. + # Also sort the data by column which most data is correlated with (e.g. user_id), reducing files size. + + num_files_per_partition = 20 + partition_columns = ["country", "business_date"] + sort_columns = ["user_id"] write_df = df.repartition( - 20, - "country", - "business_date", - "user_id", - ).sortWithinPartitions( - "country", - "business_date", - "user_id", - ) + num_files_per_partition, + *partition_columns, + *sort_columns, + ).sortWithinPartitions(*partition_columns, *sort_columns) writer = DBWriter( connection=hive, @@ -38,8 +37,7 @@ Examples options=Hive.WriteOptions( if_exists="append", # Hive partitioning columns. - # `user_id`` column is not included, as it has a lot of distinct values. - partitionBy=["country", "business_date"], + partitionBy=partition_columns, ), ) From a39764b04385f4ac0ac56e9026c68e78452dc579 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Wed, 25 Sep 2024 12:40:34 +0000 Subject: [PATCH 05/28] Use Spark 3.5.3 for tests --- .github/workflows/data/clickhouse/matrix.yml | 2 +- .github/workflows/data/core/matrix.yml | 2 +- .github/workflows/data/hdfs/matrix.yml | 2 +- .github/workflows/data/hive/matrix.yml | 2 +- .github/workflows/data/kafka/matrix.yml | 2 +- .github/workflows/data/mongodb/matrix.yml | 2 +- .github/workflows/data/mssql/matrix.yml | 2 +- .github/workflows/data/mysql/matrix.yml | 2 +- .github/workflows/data/oracle/matrix.yml | 2 +- .github/workflows/data/postgres/matrix.yml | 2 +- .github/workflows/data/s3/matrix.yml | 2 +- .github/workflows/data/teradata/matrix.yml | 2 +- CONTRIBUTING.rst | 2 +- README.rst | 6 +++--- docker-compose.yml | 2 +- docker/Dockerfile | 2 +- docs/connection/db_connection/clickhouse/types.rst | 4 ++-- docs/connection/db_connection/mssql/types.rst | 4 ++-- docs/connection/db_connection/mysql/types.rst | 4 ++-- docs/connection/db_connection/oracle/types.rst | 4 ++-- docs/connection/db_connection/postgres/types.rst | 4 ++-- onetl/_metrics/extract.py | 2 +- onetl/_metrics/listener/base.py | 2 +- onetl/_metrics/listener/execution.py | 12 ++++++------ onetl/_metrics/listener/job.py | 4 ++-- onetl/_metrics/listener/listener.py | 2 +- onetl/_metrics/listener/stage.py | 6 +++--- onetl/_metrics/listener/task.py | 6 +++--- onetl/_util/spark.py | 2 +- onetl/connection/db_connection/kafka/connection.py | 2 +- .../file_df_connection/spark_s3/connection.py | 6 +++--- onetl/file/format/avro.py | 2 +- onetl/file/format/xml.py | 8 ++++---- .../tests/{spark-3.5.2.txt => spark-3.5.3.txt} | 2 +- .../test_file/test_format_unit/test_avro_unit.py | 8 ++++---- .../test_file/test_format_unit/test_excel_unit.py | 10 +++++----- .../test_spark_s3_unit.py | 6 +++--- 37 files changed, 68 insertions(+), 68 deletions(-) rename requirements/tests/{spark-3.5.2.txt => spark-3.5.3.txt} (76%) diff --git a/.github/workflows/data/clickhouse/matrix.yml b/.github/workflows/data/clickhouse/matrix.yml index 1373d5e8..075d98a0 100644 --- a/.github/workflows/data/clickhouse/matrix.yml +++ b/.github/workflows/data/clickhouse/matrix.yml @@ -11,7 +11,7 @@ min: &min max: &max clickhouse-image: clickhouse/clickhouse-server clickhouse-version: 24.8.2.3-alpine - spark-version: 3.5.2 + spark-version: 3.5.3 pydantic-version: 2 python-version: '3.12' java-version: 20 diff --git a/.github/workflows/data/core/matrix.yml b/.github/workflows/data/core/matrix.yml index 504f1d4d..67d32ce3 100644 --- a/.github/workflows/data/core/matrix.yml +++ b/.github/workflows/data/core/matrix.yml @@ -6,7 +6,7 @@ min: &min os: ubuntu-latest max: &max - spark-version: 3.5.2 + spark-version: 3.5.3 pydantic-version: 2 python-version: '3.12' java-version: 20 diff --git a/.github/workflows/data/hdfs/matrix.yml b/.github/workflows/data/hdfs/matrix.yml index f8bae7d5..bb913214 100644 --- a/.github/workflows/data/hdfs/matrix.yml +++ b/.github/workflows/data/hdfs/matrix.yml @@ -8,7 +8,7 @@ min: &min max: &max hadoop-version: hadoop3-hdfs - spark-version: 3.5.2 + spark-version: 3.5.3 pydantic-version: 2 python-version: '3.12' java-version: 20 diff --git a/.github/workflows/data/hive/matrix.yml b/.github/workflows/data/hive/matrix.yml index 31b2120f..6bb53edf 100644 --- a/.github/workflows/data/hive/matrix.yml +++ b/.github/workflows/data/hive/matrix.yml @@ -6,7 +6,7 @@ min: &min os: ubuntu-latest max: &max - spark-version: 3.5.2 + spark-version: 3.5.3 pydantic-version: 2 python-version: '3.12' java-version: 20 diff --git a/.github/workflows/data/kafka/matrix.yml b/.github/workflows/data/kafka/matrix.yml index 4ff5fe64..309c0cae 100644 --- a/.github/workflows/data/kafka/matrix.yml +++ b/.github/workflows/data/kafka/matrix.yml @@ -12,7 +12,7 @@ min: &min max: &max kafka-version: 3.7.1 pydantic-version: 2 - spark-version: 3.5.2 + spark-version: 3.5.3 python-version: '3.12' java-version: 20 os: ubuntu-latest diff --git a/.github/workflows/data/mongodb/matrix.yml b/.github/workflows/data/mongodb/matrix.yml index 11892d65..6df9c853 100644 --- a/.github/workflows/data/mongodb/matrix.yml +++ b/.github/workflows/data/mongodb/matrix.yml @@ -9,7 +9,7 @@ min: &min max: &max mongodb-version: 7.0.14 - spark-version: 3.5.2 + spark-version: 3.5.3 pydantic-version: 2 python-version: '3.12' java-version: 20 diff --git a/.github/workflows/data/mssql/matrix.yml b/.github/workflows/data/mssql/matrix.yml index 3748a0a7..490bc1fb 100644 --- a/.github/workflows/data/mssql/matrix.yml +++ b/.github/workflows/data/mssql/matrix.yml @@ -8,7 +8,7 @@ min: &min max: &max mssql-version: 2022-CU14-ubuntu-22.04 - spark-version: 3.5.2 + spark-version: 3.5.3 pydantic-version: 2 python-version: '3.12' java-version: 20 diff --git a/.github/workflows/data/mysql/matrix.yml b/.github/workflows/data/mysql/matrix.yml index 17dacdb2..b98c985c 100644 --- a/.github/workflows/data/mysql/matrix.yml +++ b/.github/workflows/data/mysql/matrix.yml @@ -10,7 +10,7 @@ min: &min max: &max mysql-version: 9.0.1 - spark-version: 3.5.2 + spark-version: 3.5.3 pydantic-version: 2 python-version: '3.12' java-version: 20 diff --git a/.github/workflows/data/oracle/matrix.yml b/.github/workflows/data/oracle/matrix.yml index ccafa20f..051f0df9 100644 --- a/.github/workflows/data/oracle/matrix.yml +++ b/.github/workflows/data/oracle/matrix.yml @@ -12,7 +12,7 @@ max: &max oracle-image: gvenzl/oracle-free oracle-version: 23.4-slim-faststart db-name: FREEPDB1 - spark-version: 3.5.2 + spark-version: 3.5.3 pydantic-version: 2 python-version: '3.12' java-version: 20 diff --git a/.github/workflows/data/postgres/matrix.yml b/.github/workflows/data/postgres/matrix.yml index cd63ae03..43b914e9 100644 --- a/.github/workflows/data/postgres/matrix.yml +++ b/.github/workflows/data/postgres/matrix.yml @@ -9,7 +9,7 @@ min: &min max: &max postgres-version: 16.4-alpine - spark-version: 3.5.2 + spark-version: 3.5.3 pydantic-version: 2 python-version: '3.12' java-version: 20 diff --git a/.github/workflows/data/s3/matrix.yml b/.github/workflows/data/s3/matrix.yml index 3990f312..5a408373 100644 --- a/.github/workflows/data/s3/matrix.yml +++ b/.github/workflows/data/s3/matrix.yml @@ -10,7 +10,7 @@ min: &min max: &max minio-version: 2024.8.29 - spark-version: 3.5.2 + spark-version: 3.5.3 pydantic-version: 2 python-version: '3.12' java-version: 20 diff --git a/.github/workflows/data/teradata/matrix.yml b/.github/workflows/data/teradata/matrix.yml index d9792be6..e9e4b5fa 100644 --- a/.github/workflows/data/teradata/matrix.yml +++ b/.github/workflows/data/teradata/matrix.yml @@ -1,5 +1,5 @@ max: &max - spark-version: 3.5.2 + spark-version: 3.5.3 pydantic-version: 2 python-version: '3.12' java-version: 20 diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index aa1a3c03..fe08ff0b 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -71,7 +71,7 @@ Create virtualenv and install dependencies: -r requirements/tests/postgres.txt \ -r requirements/tests/oracle.txt \ -r requirements/tests/pydantic-2.txt \ - -r requirements/tests/spark-3.5.2.txt + -r requirements/tests/spark-3.5.3.txt # TODO: remove after https://github.com/zqmillet/sphinx-plantuml/pull/4 pip install sphinx-plantuml --no-deps diff --git a/README.rst b/README.rst index 9def167f..79561c1d 100644 --- a/README.rst +++ b/README.rst @@ -184,7 +184,7 @@ Compatibility matrix +--------------------------------------------------------------+-------------+-------------+-------+ | `3.4.x `_ | 3.7 - 3.12 | 8u362 - 20 | 2.12 | +--------------------------------------------------------------+-------------+-------------+-------+ -| `3.5.x `_ | 3.8 - 3.12 | 8u371 - 20 | 2.12 | +| `3.5.x `_ | 3.8 - 3.12 | 8u371 - 20 | 2.12 | +--------------------------------------------------------------+-------------+-------------+-------+ .. _pyspark-install: @@ -199,7 +199,7 @@ or install PySpark explicitly: .. code:: bash - pip install onetl pyspark==3.5.2 # install a specific PySpark version + pip install onetl pyspark==3.5.3 # install a specific PySpark version or inject PySpark to ``sys.path`` in some other way BEFORE creating a class instance. **Otherwise connection object cannot be created.** @@ -540,7 +540,7 @@ Read files directly from S3 path, convert them to dataframe, transform it and th setup_logging() # Initialize new SparkSession with Hadoop AWS libraries and Postgres driver loaded - maven_packages = SparkS3.get_packages(spark_version="3.5.2") + Postgres.get_packages() + maven_packages = SparkS3.get_packages(spark_version="3.5.3") + Postgres.get_packages() spark = ( SparkSession.builder.appName("spark_app_onetl_demo") .config("spark.jars.packages", ",".join(maven_packages)) diff --git a/docker-compose.yml b/docker-compose.yml index d32f682a..9764ed00 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,7 +9,7 @@ services: context: . target: base args: - SPARK_VERSION: 3.5.2 + SPARK_VERSION: 3.5.3 env_file: .env.docker volumes: - ./:/app/ diff --git a/docker/Dockerfile b/docker/Dockerfile index 68f40a52..6dfbaf56 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -44,7 +44,7 @@ ENV PATH=${ONETL_USER_HOME}/.local/bin:${PATH} COPY --chown=onetl:onetl ./run_tests.sh ./pytest_runner.sh ./combine_coverage.sh /app/ RUN chmod +x /app/run_tests.sh /app/pytest_runner.sh /app/combine_coverage.sh -ARG SPARK_VERSION=3.5.2 +ARG SPARK_VERSION=3.5.3 # Spark is heavy, and version change is quite rare COPY --chown=onetl:onetl ./requirements/tests/spark-${SPARK_VERSION}.txt /app/requirements/tests/ RUN pip install -r /app/requirements/tests/spark-${SPARK_VERSION}.txt diff --git a/docs/connection/db_connection/clickhouse/types.rst b/docs/connection/db_connection/clickhouse/types.rst index 0d8c5675..6135e066 100644 --- a/docs/connection/db_connection/clickhouse/types.rst +++ b/docs/connection/db_connection/clickhouse/types.rst @@ -106,8 +106,8 @@ References Here you can find source code with type conversions: * `Clickhouse -> JDBC `_ -* `JDBC -> Spark `_ -* `Spark -> JDBC `_ +* `JDBC -> Spark `_ +* `Spark -> JDBC `_ * `JDBC -> Clickhouse `_ Supported types diff --git a/docs/connection/db_connection/mssql/types.rst b/docs/connection/db_connection/mssql/types.rst index 852289ad..3a8b2d36 100644 --- a/docs/connection/db_connection/mssql/types.rst +++ b/docs/connection/db_connection/mssql/types.rst @@ -101,8 +101,8 @@ References Here you can find source code with type conversions: * `MSSQL -> JDBC `_ -* `JDBC -> Spark `_ -* `Spark -> JDBC `_ +* `JDBC -> Spark `_ +* `Spark -> JDBC `_ * `JDBC -> MSSQL `_ Supported types diff --git a/docs/connection/db_connection/mysql/types.rst b/docs/connection/db_connection/mysql/types.rst index 001a221f..71795925 100644 --- a/docs/connection/db_connection/mysql/types.rst +++ b/docs/connection/db_connection/mysql/types.rst @@ -97,8 +97,8 @@ References Here you can find source code with type conversions: * `MySQL -> JDBC `_ -* `JDBC -> Spark `_ -* `Spark -> JDBC `_ +* `JDBC -> Spark `_ +* `Spark -> JDBC `_ * `JDBC -> MySQL `_ Supported types diff --git a/docs/connection/db_connection/oracle/types.rst b/docs/connection/db_connection/oracle/types.rst index 2433b0f7..28ece909 100644 --- a/docs/connection/db_connection/oracle/types.rst +++ b/docs/connection/db_connection/oracle/types.rst @@ -101,8 +101,8 @@ See `List of Oracle types Spark `_ -* `Spark -> JDBC `_ +* `JDBC -> Spark `_ +* `Spark -> JDBC `_ Numeric types ~~~~~~~~~~~~~ diff --git a/docs/connection/db_connection/postgres/types.rst b/docs/connection/db_connection/postgres/types.rst index f0fe8821..e47336f0 100644 --- a/docs/connection/db_connection/postgres/types.rst +++ b/docs/connection/db_connection/postgres/types.rst @@ -109,8 +109,8 @@ See `List of Postgres types JDBC `_ -* `JDBC -> Spark `_ -* `Spark -> JDBC `_ +* `JDBC -> Spark `_ +* `Spark -> JDBC `_ Numeric types ~~~~~~~~~~~~~ diff --git a/onetl/_metrics/extract.py b/onetl/_metrics/extract.py index 4b058092..bd293779 100644 --- a/onetl/_metrics/extract.py +++ b/onetl/_metrics/extract.py @@ -70,7 +70,7 @@ def extract_metrics_from_execution(execution: SparkListenerExecution) -> SparkCo disk_spilled_bytes += stage.metrics.disk_spilled_bytes result_size_bytes += stage.metrics.result_size_bytes - # https://github.com/apache/spark/blob/v3.5.2/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L467-L473 + # https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L467-L473 input_file_count = ( _get_int(execution.metrics, SparkSQLMetricNames.NUMBER_OF_FILES_READ) or _get_int(execution.metrics, SparkSQLMetricNames.STATIC_NUMBER_OF_FILES_READ) diff --git a/onetl/_metrics/listener/base.py b/onetl/_metrics/listener/base.py index bbc6431c..d1b9555d 100644 --- a/onetl/_metrics/listener/base.py +++ b/onetl/_metrics/listener/base.py @@ -16,7 +16,7 @@ class BaseSparkListener: """Base no-op SparkListener implementation. - See `SparkListener `_ interface. + See `SparkListener `_ interface. """ spark: SparkSession diff --git a/onetl/_metrics/listener/execution.py b/onetl/_metrics/listener/execution.py index a0d2a522..ef2e7ffd 100644 --- a/onetl/_metrics/listener/execution.py +++ b/onetl/_metrics/listener/execution.py @@ -22,18 +22,18 @@ class SparkSQLMetricNames(str, Enum): # noqa: WPS338 # Metric names passed to SQLMetrics.createMetric(...) # But only those we're interested in. - # https://github.com/apache/spark/blob/v3.5.2/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L231 + # https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L231 NUMBER_OF_PARTITIONS_READ = "number of partitions read" - # https://github.com/apache/spark/blob/v3.5.2/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L225-L227 + # https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L225-L227 NUMBER_OF_FILES_READ = "number of files read" SIZE_OF_FILES_READ = "size of files read" - # https://github.com/apache/spark/blob/v3.5.2/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L225-L227 + # https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L225-L227 STATIC_NUMBER_OF_FILES_READ = "static number of files read" STATIC_SIZE_OF_FILES_READ = "static size of files read" - # https://github.com/apache/spark/blob/v3.5.2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala#L241-L246 + # https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala#L241-L246 NUMBER_OF_DYNAMIC_PART = "number of dynamic part" NUMBER_OF_WRITTEN_FILES = "number of written files" @@ -66,11 +66,11 @@ def jobs(self) -> list[SparkListenerJob]: return result def on_execution_start(self, event): - # https://github.com/apache/spark/blob/v3.5.2/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L44-L58 + # https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L44-L58 self.status = SparkListenerExecutionStatus.STARTED def on_execution_end(self, event): - # https://github.com/apache/spark/blob/v3.5.2/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L61-L83 + # https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L61-L83 for job in self._jobs.values(): if job.status == SparkListenerJobStatus.FAILED: self.status = SparkListenerExecutionStatus.FAILED diff --git a/onetl/_metrics/listener/job.py b/onetl/_metrics/listener/job.py index 5581d76e..488282ed 100644 --- a/onetl/_metrics/listener/job.py +++ b/onetl/_metrics/listener/job.py @@ -38,8 +38,8 @@ def stages(self) -> list[SparkListenerStage]: @classmethod def create(cls, event): - # https://spark.apache.org/docs/3.5.2/api/java/org/apache/spark/scheduler/SparkListenerJobSubmitted.html - # https://spark.apache.org/docs/3.5.2/api/java/org/apache/spark/scheduler/SparkListenerJobCompleted.html + # https://spark.apache.org/docs/3.5.3/api/java/org/apache/spark/scheduler/SparkListenerJobSubmitted.html + # https://spark.apache.org/docs/3.5.3/api/java/org/apache/spark/scheduler/SparkListenerJobCompleted.html result = cls( id=event.jobId(), description=event.properties().get("spark.job.description"), diff --git a/onetl/_metrics/listener/listener.py b/onetl/_metrics/listener/listener.py index 04fe53c2..22522432 100644 --- a/onetl/_metrics/listener/listener.py +++ b/onetl/_metrics/listener/listener.py @@ -75,7 +75,7 @@ def onExecutionEnd(self, event): # Get execution metrics from SQLAppStatusStore, # as SparkListenerSQLExecutionEnd event does not provide them: - # https://github.com/apache/spark/blob/v3.5.2/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala + # https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala session_status_store = self.spark._jsparkSession.sharedState().statusStore() # noqa: WPS437 raw_execution = session_status_store.execution(execution.id).get() metrics = raw_execution.metrics() diff --git a/onetl/_metrics/listener/stage.py b/onetl/_metrics/listener/stage.py index b858e151..3e233570 100644 --- a/onetl/_metrics/listener/stage.py +++ b/onetl/_metrics/listener/stage.py @@ -21,7 +21,7 @@ def __str__(self): @dataclass class SparkListenerStage: - # https://spark.apache.org/docs/3.5.2/api/java/org/apache/spark/scheduler/StageInfo.html + # https://spark.apache.org/docs/3.5.3/api/java/org/apache/spark/scheduler/StageInfo.html id: int status: SparkListenerStageStatus = SparkListenerStageStatus.PENDING metrics: SparkListenerTaskMetrics = field(default_factory=SparkListenerTaskMetrics, repr=False, init=False) @@ -39,11 +39,11 @@ def create(cls, stage_info): return cls(id=stage_info.stageId()) def on_stage_start(self, event): - # https://spark.apache.org/docs/3.5.2/api/java/org/apache/spark/scheduler/SparkListenerStageSubmitted.html + # https://spark.apache.org/docs/3.5.3/api/java/org/apache/spark/scheduler/SparkListenerStageSubmitted.html self.status = SparkListenerStageStatus.ACTIVE def on_stage_end(self, event): - # https://spark.apache.org/docs/3.5.2/api/java/org/apache/spark/scheduler/SparkListenerStageCompleted.html + # https://spark.apache.org/docs/3.5.3/api/java/org/apache/spark/scheduler/SparkListenerStageCompleted.html stage_info = event.stageInfo() if stage_info.failureReason().isDefined(): self.status = SparkListenerStageStatus.FAILED diff --git a/onetl/_metrics/listener/task.py b/onetl/_metrics/listener/task.py index 5a17ffc5..aecc45fe 100644 --- a/onetl/_metrics/listener/task.py +++ b/onetl/_metrics/listener/task.py @@ -81,14 +81,14 @@ class SparkListenerTask: @classmethod def create(cls, task_info): - # https://spark.apache.org/docs/3.5.2/api/java/org/apache/spark/scheduler/TaskInfo.html + # https://spark.apache.org/docs/3.5.3/api/java/org/apache/spark/scheduler/TaskInfo.html return cls(id=task_info.taskId()) def on_task_start(self, event): - # https://spark.apache.org/docs/3.5.2/api/java/org/apache/spark/scheduler/SparkListenerTaskStart.html + # https://spark.apache.org/docs/3.5.3/api/java/org/apache/spark/scheduler/SparkListenerTaskStart.html self.status = SparkListenerTaskStatus(event.taskInfo().status()) def on_task_end(self, event): - # https://spark.apache.org/docs/3.5.2/api/java/org/apache/spark/scheduler/SparkListenerTaskEnd.html + # https://spark.apache.org/docs/3.5.3/api/java/org/apache/spark/scheduler/SparkListenerTaskEnd.html self.status = SparkListenerTaskStatus(event.taskInfo().status()) self.metrics = SparkListenerTaskMetrics.create(event.taskMetrics()) diff --git a/onetl/_util/spark.py b/onetl/_util/spark.py index ab2090b0..9fa592db 100644 --- a/onetl/_util/spark.py +++ b/onetl/_util/spark.py @@ -143,7 +143,7 @@ def estimate_dataframe_size(spark_session: SparkSession, df: DataFrame) -> int: """ Estimate in-memory DataFrame size in bytes. If cannot be estimated, return 0. - Using Spark's `SizeEstimator `_. + Using Spark's `SizeEstimator `_. """ try: size_estimator = spark_session._jvm.org.apache.spark.util.SizeEstimator # type: ignore[union-attr] diff --git a/onetl/connection/db_connection/kafka/connection.py b/onetl/connection/db_connection/kafka/connection.py index 93f9d821..71fa82bd 100644 --- a/onetl/connection/db_connection/kafka/connection.py +++ b/onetl/connection/db_connection/kafka/connection.py @@ -332,7 +332,7 @@ def write_df_to_target( write_options.update(options.dict(by_alias=True, exclude_none=True, exclude={"if_exists"})) write_options["topic"] = target - # As of Apache Spark version 3.5.2, the mode 'error' is not functioning as expected. + # As of Apache Spark version 3.5.3, the mode 'error' is not functioning as expected. # This issue has been reported and can be tracked at: # https://issues.apache.org/jira/browse/SPARK-44774 mode = options.if_exists diff --git a/onetl/connection/file_df_connection/spark_s3/connection.py b/onetl/connection/file_df_connection/spark_s3/connection.py index 182955cd..2044ebb6 100644 --- a/onetl/connection/file_df_connection/spark_s3/connection.py +++ b/onetl/connection/file_df_connection/spark_s3/connection.py @@ -133,7 +133,7 @@ class SparkS3(SparkFileDFConnection): from pyspark.sql import SparkSession # Create Spark session with Hadoop AWS libraries loaded - maven_packages = SparkS3.get_packages(spark_version="3.5.2") + maven_packages = SparkS3.get_packages(spark_version="3.5.3") # Some dependencies are not used, but downloading takes a lot of time. Skipping them. excluded_packages = [ "com.google.cloud.bigdataoss:gcs-connector", @@ -236,8 +236,8 @@ def get_packages( from onetl.connection import SparkS3 - SparkS3.get_packages(spark_version="3.5.2") - SparkS3.get_packages(spark_version="3.5.2", scala_version="2.12") + SparkS3.get_packages(spark_version="3.5.3") + SparkS3.get_packages(spark_version="3.5.3", scala_version="2.12") """ diff --git a/onetl/file/format/avro.py b/onetl/file/format/avro.py index 1f6e2e0e..426eb30f 100644 --- a/onetl/file/format/avro.py +++ b/onetl/file/format/avro.py @@ -88,7 +88,7 @@ class Avro(ReadWriteFileFormat): from pyspark.sql import SparkSession # Create Spark session with Avro package loaded - maven_packages = Avro.get_packages(spark_version="3.5.2") + maven_packages = Avro.get_packages(spark_version="3.5.3") spark = ( SparkSession.builder.appName("spark-app-name") .config("spark.jars.packages", ",".join(maven_packages)) diff --git a/onetl/file/format/xml.py b/onetl/file/format/xml.py index 2e1ad003..508ec829 100644 --- a/onetl/file/format/xml.py +++ b/onetl/file/format/xml.py @@ -119,7 +119,7 @@ class XML(ReadWriteFileFormat): from pyspark.sql import SparkSession # Create Spark session with XML package loaded - maven_packages = XML.get_packages(spark_version="3.5.2") + maven_packages = XML.get_packages(spark_version="3.5.3") spark = ( SparkSession.builder.appName("spark-app-name") .config("spark.jars.packages", ",".join(maven_packages)) @@ -184,10 +184,10 @@ def get_packages( # noqa: WPS231 from onetl.file.format import XML - XML.get_packages(spark_version="3.5.2") - XML.get_packages(spark_version="3.5.2", scala_version="2.12") + XML.get_packages(spark_version="3.5.3") + XML.get_packages(spark_version="3.5.3", scala_version="2.12") XML.get_packages( - spark_version="3.5.2", + spark_version="3.5.3", scala_version="2.12", package_version="0.18.0", ) diff --git a/requirements/tests/spark-3.5.2.txt b/requirements/tests/spark-3.5.3.txt similarity index 76% rename from requirements/tests/spark-3.5.2.txt rename to requirements/tests/spark-3.5.3.txt index 214f0d63..c5c1408a 100644 --- a/requirements/tests/spark-3.5.2.txt +++ b/requirements/tests/spark-3.5.3.txt @@ -1,5 +1,5 @@ numpy>=1.16 pandas>=1.0 pyarrow>=1.0 -pyspark==3.5.2 +pyspark==3.5.3 sqlalchemy diff --git a/tests/tests_unit/test_file/test_format_unit/test_avro_unit.py b/tests/tests_unit/test_file/test_format_unit/test_avro_unit.py index 53c7a67a..37862cf7 100644 --- a/tests/tests_unit/test_file/test_format_unit/test_avro_unit.py +++ b/tests/tests_unit/test_file/test_format_unit/test_avro_unit.py @@ -29,14 +29,14 @@ def test_avro_get_packages_scala_version_not_supported(): [ # Detect Scala version by Spark version ("2.4.0", None, "org.apache.spark:spark-avro_2.11:2.4.0"), - ("3.5.2", None, "org.apache.spark:spark-avro_2.12:3.5.2"), + ("3.5.3", None, "org.apache.spark:spark-avro_2.12:3.5.3"), # Override Scala version ("2.4.0", "2.11", "org.apache.spark:spark-avro_2.11:2.4.0"), ("2.4.0", "2.12", "org.apache.spark:spark-avro_2.12:2.4.0"), - ("3.5.2", "2.12", "org.apache.spark:spark-avro_2.12:3.5.2"), - ("3.5.2", "2.13", "org.apache.spark:spark-avro_2.13:3.5.2"), + ("3.5.3", "2.12", "org.apache.spark:spark-avro_2.12:3.5.3"), + ("3.5.3", "2.13", "org.apache.spark:spark-avro_2.13:3.5.3"), # Scala version contain three digits when only two needed - ("3.5.2", "2.12.1", "org.apache.spark:spark-avro_2.12:3.5.2"), + ("3.5.3", "2.12.1", "org.apache.spark:spark-avro_2.12:3.5.3"), ], ) def test_avro_get_packages(spark_version, scala_version, package): diff --git a/tests/tests_unit/test_file/test_format_unit/test_excel_unit.py b/tests/tests_unit/test_file/test_format_unit/test_excel_unit.py index ecacb2ca..9f949651 100644 --- a/tests/tests_unit/test_file/test_format_unit/test_excel_unit.py +++ b/tests/tests_unit/test_file/test_format_unit/test_excel_unit.py @@ -35,17 +35,17 @@ def test_excel_get_packages_package_version_not_supported(): [ # Detect Scala version by Spark version ("3.2.4", None, None, ["com.crealytics:spark-excel_2.12:3.2.4_0.20.4"]), - ("3.5.2", None, None, ["com.crealytics:spark-excel_2.12:3.5.2_0.20.4"]), + ("3.5.3", None, None, ["com.crealytics:spark-excel_2.12:3.5.3_0.20.4"]), # Override Scala version ("3.2.4", "2.12", None, ["com.crealytics:spark-excel_2.12:3.2.4_0.20.4"]), ("3.2.4", "2.13", None, ["com.crealytics:spark-excel_2.13:3.2.4_0.20.4"]), - ("3.5.2", "2.12", None, ["com.crealytics:spark-excel_2.12:3.5.2_0.20.4"]), - ("3.5.2", "2.13", None, ["com.crealytics:spark-excel_2.13:3.5.2_0.20.4"]), + ("3.5.3", "2.12", None, ["com.crealytics:spark-excel_2.12:3.5.3_0.20.4"]), + ("3.5.3", "2.13", None, ["com.crealytics:spark-excel_2.13:3.5.3_0.20.4"]), # Override package version ("3.2.0", None, "0.16.0", ["com.crealytics:spark-excel_2.12:3.2.0_0.16.0"]), - ("3.5.2", None, "0.18.0", ["com.crealytics:spark-excel_2.12:3.5.2_0.18.0"]), + ("3.5.3", None, "0.18.0", ["com.crealytics:spark-excel_2.12:3.5.3_0.18.0"]), # Scala version contain three digits when only two needed - ("3.5.2", "2.12.1", None, ["com.crealytics:spark-excel_2.12:3.5.2_0.20.4"]), + ("3.5.3", "2.12.1", None, ["com.crealytics:spark-excel_2.12:3.5.3_0.20.4"]), ], ) def test_excel_get_packages(caplog, spark_version, scala_version, package_version, packages): diff --git a/tests/tests_unit/tests_file_df_connection_unit/test_spark_s3_unit.py b/tests/tests_unit/tests_file_df_connection_unit/test_spark_s3_unit.py index 9a5e6fac..59254361 100644 --- a/tests/tests_unit/tests_file_df_connection_unit/test_spark_s3_unit.py +++ b/tests/tests_unit/tests_file_df_connection_unit/test_spark_s3_unit.py @@ -10,9 +10,9 @@ @pytest.mark.parametrize( "spark_version, scala_version, package", [ - ("3.5.2", None, "org.apache.spark:spark-hadoop-cloud_2.12:3.5.2"), - ("3.5.2", "2.12", "org.apache.spark:spark-hadoop-cloud_2.12:3.5.2"), - ("3.5.2", "2.13", "org.apache.spark:spark-hadoop-cloud_2.13:3.5.2"), + ("3.5.3", None, "org.apache.spark:spark-hadoop-cloud_2.12:3.5.3"), + ("3.5.3", "2.12", "org.apache.spark:spark-hadoop-cloud_2.12:3.5.3"), + ("3.5.3", "2.13", "org.apache.spark:spark-hadoop-cloud_2.13:3.5.3"), ], ) def test_spark_s3_get_packages(spark_version, scala_version, package): From c3f4d243a32e2cb0d99519d7fde024ba5150ad7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Wed, 25 Sep 2024 12:55:34 +0000 Subject: [PATCH 06/28] Fix MSSQL tests --- .github/workflows/data/mssql/matrix.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/data/mssql/matrix.yml b/.github/workflows/data/mssql/matrix.yml index 490bc1fb..96a84dce 100644 --- a/.github/workflows/data/mssql/matrix.yml +++ b/.github/workflows/data/mssql/matrix.yml @@ -4,7 +4,7 @@ min: &min pydantic-version: 1 python-version: '3.7' java-version: 8 - os: ubuntu-latest + os: ubuntu-20.04 max: &max mssql-version: 2022-CU14-ubuntu-22.04 From a7c3885e667bd46376a4c9dd77c520a4de760f43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Wed, 25 Sep 2024 13:34:52 +0000 Subject: [PATCH 07/28] Fix MSSQL tests --- .github/workflows/data/mssql/matrix.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/data/mssql/matrix.yml b/.github/workflows/data/mssql/matrix.yml index 96a84dce..209b110b 100644 --- a/.github/workflows/data/mssql/matrix.yml +++ b/.github/workflows/data/mssql/matrix.yml @@ -1,13 +1,13 @@ min: &min - mssql-version: 2017-GA-ubuntu + mssql-version: 2017-latest spark-version: 2.3.1 pydantic-version: 1 python-version: '3.7' java-version: 8 - os: ubuntu-20.04 + os: ubuntu-latest max: &max - mssql-version: 2022-CU14-ubuntu-22.04 + mssql-version: 2022-latest spark-version: 3.5.3 pydantic-version: 2 python-version: '3.12' From 243662f02c8dc133b289dd886076367c00307e09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Wed, 25 Sep 2024 13:50:07 +0000 Subject: [PATCH 08/28] Fix MSSQL tests --- .github/workflows/data/mssql/matrix.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/data/mssql/matrix.yml b/.github/workflows/data/mssql/matrix.yml index 209b110b..d6c3c8d0 100644 --- a/.github/workflows/data/mssql/matrix.yml +++ b/.github/workflows/data/mssql/matrix.yml @@ -4,7 +4,7 @@ min: &min pydantic-version: 1 python-version: '3.7' java-version: 8 - os: ubuntu-latest + os: ubuntu-22.04 max: &max mssql-version: 2022-latest From da24f60d15cbebef1970f94478121fd3d102703a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Wed, 25 Sep 2024 14:07:07 +0000 Subject: [PATCH 09/28] Fix MSSQL tests --- .github/workflows/data/mssql/matrix.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/data/mssql/matrix.yml b/.github/workflows/data/mssql/matrix.yml index d6c3c8d0..978faa17 100644 --- a/.github/workflows/data/mssql/matrix.yml +++ b/.github/workflows/data/mssql/matrix.yml @@ -4,7 +4,7 @@ min: &min pydantic-version: 1 python-version: '3.7' java-version: 8 - os: ubuntu-22.04 + os: ubuntu-20.04 max: &max mssql-version: 2022-latest From 304aaf49e2c045895a9a5629e6482f365680da3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Thu, 26 Sep 2024 09:45:42 +0000 Subject: [PATCH 10/28] [DOP-20393] Reduce number of active namenode checks for HDFS --- .../file_connection/file_connection.py | 7 ++++- .../file_connection/hdfs/connection.py | 29 +++++++++++++++---- .../spark_hdfs/connection.py | 16 +++++++--- 3 files changed, 42 insertions(+), 10 deletions(-) diff --git a/onetl/connection/file_connection/file_connection.py b/onetl/connection/file_connection/file_connection.py index de5916c4..c85eead4 100644 --- a/onetl/connection/file_connection/file_connection.py +++ b/onetl/connection/file_connection/file_connection.py @@ -11,6 +11,11 @@ from humanize import naturalsize +try: + from pydantic.v1 import PrivateAttr +except (ImportError, AttributeError): + from pydantic import PrivateAttr # type: ignore[no-redef, assignment] + from onetl.base import ( BaseFileConnection, BaseFileFilter, @@ -41,7 +46,7 @@ @support_hooks class FileConnection(BaseFileConnection, FrozenModel): - _clients_cache: Any = None + _clients_cache: Any = PrivateAttr(default=None) @property def client(self): diff --git a/onetl/connection/file_connection/hdfs/connection.py b/onetl/connection/file_connection/hdfs/connection.py index 4216f80f..ead8f6a4 100644 --- a/onetl/connection/file_connection/hdfs/connection.py +++ b/onetl/connection/file_connection/hdfs/connection.py @@ -6,14 +6,22 @@ import stat import textwrap from logging import getLogger +from threading import Lock from typing import TYPE_CHECKING, Optional, Tuple from etl_entities.instance import Cluster, Host try: - from pydantic.v1 import Field, FilePath, SecretStr, root_validator, validator + from pydantic.v1 import ( + Field, + FilePath, + PrivateAttr, + SecretStr, + root_validator, + validator, + ) except (ImportError, AttributeError): - from pydantic import Field, FilePath, SecretStr, root_validator, validator # type: ignore[no-redef, assignment] + from pydantic import Field, FilePath, SecretStr, PrivateAttr, root_validator, validator # type: ignore[no-redef, assignment] from onetl.base import PathStatProtocol from onetl.connection.file_connection.file_connection import FileConnection @@ -212,6 +220,9 @@ class HDFS(FileConnection, RenameDirMixin): # TODO: remove in v1.0.0 slots = Slots + _active_host_lock: Lock = PrivateAttr(default_factory=Lock) + _active_host: Optional[Host] = PrivateAttr(default=None) + @slot @classmethod def get_current(cls, **kwargs): @@ -412,10 +423,15 @@ def _get_host(self) -> str: raise RuntimeError(f"Host {self.host!r} is not an active namenode") - def _get_client(self) -> Client: - host = self._get_host() - conn_str = f"http://{host}:{self.webhdfs_port}" # NOSONAR + def _get_conn_str(self) -> str: + # cache active host to reduce number of requests. + # acquire a lock to avoid sending the same request for each thread. + with self._active_host_lock: + if not self._active_host: + self._active_host = self._get_host() + return f"http://{self._active_host}:{self.webhdfs_port}" + def _get_client(self) -> Client: if self.user and (self.keytab or self.password): from hdfs.ext.kerberos import KerberosClient # noqa: F811 @@ -424,10 +440,13 @@ def _get_client(self) -> Client: keytab=self.keytab, password=self.password.get_secret_value() if self.password else None, ) + # checking if namenode is active requires a Kerberos ticket + conn_str = self._get_conn_str() client = KerberosClient(conn_str, timeout=self.timeout) else: from hdfs import InsecureClient # noqa: F401, WPS442, F811 + conn_str = self._get_conn_str() client = InsecureClient(conn_str, user=self.user) return client diff --git a/onetl/connection/file_df_connection/spark_hdfs/connection.py b/onetl/connection/file_df_connection/spark_hdfs/connection.py index ab13b530..80953d67 100644 --- a/onetl/connection/file_df_connection/spark_hdfs/connection.py +++ b/onetl/connection/file_df_connection/spark_hdfs/connection.py @@ -7,6 +7,7 @@ import os from contextlib import suppress from pathlib import Path +from threading import Lock from typing import TYPE_CHECKING, Optional from etl_entities.instance import Cluster, Host @@ -154,6 +155,7 @@ class SparkHDFS(SparkFileDFConnection): host: Optional[Host] = None ipc_port: int = Field(default=8020, alias="port") + _active_host_lock: Lock = PrivateAttr(default_factory=Lock) _active_host: Optional[Host] = PrivateAttr(default=None) @slot @@ -341,11 +343,17 @@ def _get_host(self) -> str: raise RuntimeError(f"Host {self.host!r} is not an active namenode of cluster {self.cluster!r}") + def _get_conn_str(self) -> str: + # cache active host to reduce number of requests. + # acquire a lock to avoid sending the same request for each thread. + with self._active_host_lock: + if not self._active_host: + self._active_host = self._get_host() + return f"hdfs://{self._active_host}:{self.ipc_port}" + def _convert_to_url(self, path: PurePathProtocol) -> str: - # "hdfs://namenode:8020/absolute/path" if host is set - if not self._active_host: - self._active_host = self._get_host() - return f"hdfs://{self._active_host}:{self.ipc_port}" + path.as_posix() + # "hdfs://namenode:8020/absolute/path" + return self._get_conn_str() + path.as_posix() def _get_default_path(self): return RemotePath("/user") / getpass.getuser() From b961120692b1d2e94bcae6d81d9eb089efb64e29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Thu, 26 Sep 2024 10:46:41 +0000 Subject: [PATCH 11/28] [DOP-20393] Fix flake8 error --- onetl/connection/file_df_connection/spark_hdfs/connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/onetl/connection/file_df_connection/spark_hdfs/connection.py b/onetl/connection/file_df_connection/spark_hdfs/connection.py index 80953d67..a4cbfa33 100644 --- a/onetl/connection/file_df_connection/spark_hdfs/connection.py +++ b/onetl/connection/file_df_connection/spark_hdfs/connection.py @@ -352,7 +352,7 @@ def _get_conn_str(self) -> str: return f"hdfs://{self._active_host}:{self.ipc_port}" def _convert_to_url(self, path: PurePathProtocol) -> str: - # "hdfs://namenode:8020/absolute/path" + # example: "hdfs://namenode:8020/absolute/path" return self._get_conn_str() + path.as_posix() def _get_default_path(self): From 5127a05f6d2cdab0687e55f86c2823efc9b9148e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Thu, 26 Sep 2024 11:05:28 +0000 Subject: [PATCH 12/28] [DOP-20393] Fix flake8 error --- onetl/connection/file_df_connection/spark_hdfs/connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/onetl/connection/file_df_connection/spark_hdfs/connection.py b/onetl/connection/file_df_connection/spark_hdfs/connection.py index a4cbfa33..4de806de 100644 --- a/onetl/connection/file_df_connection/spark_hdfs/connection.py +++ b/onetl/connection/file_df_connection/spark_hdfs/connection.py @@ -352,7 +352,7 @@ def _get_conn_str(self) -> str: return f"hdfs://{self._active_host}:{self.ipc_port}" def _convert_to_url(self, path: PurePathProtocol) -> str: - # example: "hdfs://namenode:8020/absolute/path" + # example "hdfs://namenode:8020/absolute/path" return self._get_conn_str() + path.as_posix() def _get_default_path(self): From 41888083b6f75850289490c49703c866ec0f4174 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Thu, 26 Sep 2024 14:24:16 +0000 Subject: [PATCH 13/28] [DOP-20162] Change estimate_dataframe_size signature --- onetl/_util/spark.py | 3 ++- onetl/connection/db_connection/jdbc_mixin/connection.py | 4 ++-- tests/tests_unit/tests_db_connection_unit/test_hive_unit.py | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/onetl/_util/spark.py b/onetl/_util/spark.py index 9fa592db..6e877a87 100644 --- a/onetl/_util/spark.py +++ b/onetl/_util/spark.py @@ -139,12 +139,13 @@ def get_spark_version(spark_session: SparkSession) -> Version: return Version(spark_session.version) -def estimate_dataframe_size(spark_session: SparkSession, df: DataFrame) -> int: +def estimate_dataframe_size(df: DataFrame) -> int: """ Estimate in-memory DataFrame size in bytes. If cannot be estimated, return 0. Using Spark's `SizeEstimator `_. """ + spark_session = df._session try: size_estimator = spark_session._jvm.org.apache.spark.util.SizeEstimator # type: ignore[union-attr] return size_estimator.estimate(df._jdf) diff --git a/onetl/connection/db_connection/jdbc_mixin/connection.py b/onetl/connection/db_connection/jdbc_mixin/connection.py index a6830ae4..a2f1d192 100644 --- a/onetl/connection/db_connection/jdbc_mixin/connection.py +++ b/onetl/connection/db_connection/jdbc_mixin/connection.py @@ -228,7 +228,7 @@ def fetch( # Just create metrics by hand, and fill them up using information based on dataframe content. metrics = SparkCommandMetrics() metrics.input.read_rows = df.count() - metrics.driver.in_memory_bytes = estimate_dataframe_size(self.spark, df) + metrics.driver.in_memory_bytes = estimate_dataframe_size(df) log.info("|%s| Recorded metrics:", self.__class__.__name__) log_lines(log, str(metrics)) return df @@ -304,7 +304,7 @@ def execute( # Just create metrics by hand, and fill them up using information based on dataframe content. metrics = SparkCommandMetrics() metrics.input.read_rows = df.count() - metrics.driver.in_memory_bytes = estimate_dataframe_size(self.spark, df) + metrics.driver.in_memory_bytes = estimate_dataframe_size(df) log.info("|%s| Recorded metrics:", self.__class__.__name__) log_lines(log, str(metrics)) diff --git a/tests/tests_unit/tests_db_connection_unit/test_hive_unit.py b/tests/tests_unit/tests_db_connection_unit/test_hive_unit.py index 891406ba..f0a25216 100644 --- a/tests/tests_unit/tests_db_connection_unit/test_hive_unit.py +++ b/tests/tests_unit/tests_db_connection_unit/test_hive_unit.py @@ -65,7 +65,7 @@ def normalize_cluster_name(cluster: str) -> str: assert Hive(cluster="RND-DWH", spark=spark_mock).cluster == "rnd-dwh" -def test_hive_known_get_current_cluster_hook(request, spark_mock, mocker): +def test_hive_known_get_current_cluster_hook(request, spark_mock): # no exception Hive(cluster="rnd-prod", spark=spark_mock).check() Hive(cluster="rnd-dwh", spark=spark_mock).check() From d4f3a1fe970ef31981f87f7817d15ed72b7a6b78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Thu, 26 Sep 2024 14:41:39 +0000 Subject: [PATCH 14/28] [DOP-20162] Change estimate_dataframe_size signature --- onetl/_util/spark.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/onetl/_util/spark.py b/onetl/_util/spark.py index 6e877a87..3265e2c4 100644 --- a/onetl/_util/spark.py +++ b/onetl/_util/spark.py @@ -145,9 +145,10 @@ def estimate_dataframe_size(df: DataFrame) -> int: Using Spark's `SizeEstimator `_. """ - spark_session = df._session + try: - size_estimator = spark_session._jvm.org.apache.spark.util.SizeEstimator # type: ignore[union-attr] + spark_context = df._sc + size_estimator = spark_context._jvm.org.apache.spark.util.SizeEstimator # type: ignore[union-attr] return size_estimator.estimate(df._jdf) except Exception: # SizeEstimator uses Java reflection which may behave differently in different Java versions, From 366b193bedd8090190ffc3731fbae22beb57ba8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Fri, 27 Sep 2024 14:38:34 +0000 Subject: [PATCH 15/28] [DOP-20393] Forget about active namenode after closing HDFS connection --- onetl/connection/file_connection/hdfs/connection.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/onetl/connection/file_connection/hdfs/connection.py b/onetl/connection/file_connection/hdfs/connection.py index ead8f6a4..f64d4fad 100644 --- a/onetl/connection/file_connection/hdfs/connection.py +++ b/onetl/connection/file_connection/hdfs/connection.py @@ -5,6 +5,7 @@ import os import stat import textwrap +from contextlib import suppress from logging import getLogger from threading import Lock from typing import TYPE_CHECKING, Optional, Tuple @@ -284,6 +285,14 @@ def __str__(self): def path_exists(self, path: os.PathLike | str) -> bool: return self.client.status(os.fspath(path), strict=False) + @slot + def close(self): + super().close() + + with suppress(Exception): + self._active_host = None + return self + @validator("user", pre=True) def _validate_packages(cls, user): if user: From 23e5926592de62265de4168fe36cdcc70a65234f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Fri, 27 Sep 2024 15:32:51 +0000 Subject: [PATCH 16/28] [DOP-20393] Use timedelta factory in SparkExecutorMetrics --- onetl/_metrics/executor.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/onetl/_metrics/executor.py b/onetl/_metrics/executor.py index bbb6d732..819f220d 100644 --- a/onetl/_metrics/executor.py +++ b/onetl/_metrics/executor.py @@ -7,12 +7,17 @@ from humanize import naturalsize, precisedelta +try: + from pydantic.v1 import Field +except (ImportError, AttributeError): + from pydantic import Field # type: ignore[no-redef, assignment] + from onetl.impl import BaseModel class SparkExecutorMetrics(BaseModel): - total_run_time: timedelta = timedelta() - total_cpu_time: timedelta = timedelta() + total_run_time: timedelta = Field(default_factory=timedelta) + total_cpu_time: timedelta = Field(default_factory=timedelta) peak_memory_bytes: int = 0 memory_spilled_bytes: int = 0 disk_spilled_bytes: int = 0 From e3ee8a441a314f6d4cec9859a7757f9d6565ff8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Mon, 30 Sep 2024 07:38:00 +0000 Subject: [PATCH 17/28] [DOP-20393] Remove lock for HDFS active namenode check --- onetl/connection/file_connection/hdfs/connection.py | 8 ++------ .../file_df_connection/spark_hdfs/connection.py | 8 ++------ 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/onetl/connection/file_connection/hdfs/connection.py b/onetl/connection/file_connection/hdfs/connection.py index f64d4fad..74eae72d 100644 --- a/onetl/connection/file_connection/hdfs/connection.py +++ b/onetl/connection/file_connection/hdfs/connection.py @@ -7,7 +7,6 @@ import textwrap from contextlib import suppress from logging import getLogger -from threading import Lock from typing import TYPE_CHECKING, Optional, Tuple from etl_entities.instance import Cluster, Host @@ -221,7 +220,6 @@ class HDFS(FileConnection, RenameDirMixin): # TODO: remove in v1.0.0 slots = Slots - _active_host_lock: Lock = PrivateAttr(default_factory=Lock) _active_host: Optional[Host] = PrivateAttr(default=None) @slot @@ -434,10 +432,8 @@ def _get_host(self) -> str: def _get_conn_str(self) -> str: # cache active host to reduce number of requests. - # acquire a lock to avoid sending the same request for each thread. - with self._active_host_lock: - if not self._active_host: - self._active_host = self._get_host() + if not self._active_host: + self._active_host = self._get_host() return f"http://{self._active_host}:{self.webhdfs_port}" def _get_client(self) -> Client: diff --git a/onetl/connection/file_df_connection/spark_hdfs/connection.py b/onetl/connection/file_df_connection/spark_hdfs/connection.py index 4de806de..12f17504 100644 --- a/onetl/connection/file_df_connection/spark_hdfs/connection.py +++ b/onetl/connection/file_df_connection/spark_hdfs/connection.py @@ -7,7 +7,6 @@ import os from contextlib import suppress from pathlib import Path -from threading import Lock from typing import TYPE_CHECKING, Optional from etl_entities.instance import Cluster, Host @@ -155,7 +154,6 @@ class SparkHDFS(SparkFileDFConnection): host: Optional[Host] = None ipc_port: int = Field(default=8020, alias="port") - _active_host_lock: Lock = PrivateAttr(default_factory=Lock) _active_host: Optional[Host] = PrivateAttr(default=None) @slot @@ -345,10 +343,8 @@ def _get_host(self) -> str: def _get_conn_str(self) -> str: # cache active host to reduce number of requests. - # acquire a lock to avoid sending the same request for each thread. - with self._active_host_lock: - if not self._active_host: - self._active_host = self._get_host() + if not self._active_host: + self._active_host = self._get_host() return f"hdfs://{self._active_host}:{self.ipc_port}" def _convert_to_url(self, path: PurePathProtocol) -> str: From f8c6d3fe4aaaff9799f2ded9d6bfaa07449b55ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Mon, 30 Sep 2024 13:51:48 +0000 Subject: [PATCH 18/28] [DOP-20393] Avoid multithreading issues with calling kinit function --- docs/changelog/next_release/+.bugfix.hdfs.rst | 1 + .../changelog/next_release/+.bugfix.kinit.rst | 1 + onetl/connection/kerberos_helpers.py | 32 +++++++++++-------- 3 files changed, 20 insertions(+), 14 deletions(-) create mode 100644 docs/changelog/next_release/+.bugfix.hdfs.rst create mode 100644 docs/changelog/next_release/+.bugfix.kinit.rst diff --git a/docs/changelog/next_release/+.bugfix.hdfs.rst b/docs/changelog/next_release/+.bugfix.hdfs.rst new file mode 100644 index 00000000..ed35c63e --- /dev/null +++ b/docs/changelog/next_release/+.bugfix.hdfs.rst @@ -0,0 +1 @@ +Call ``kinit`` before checking for HDFS active namenode. diff --git a/docs/changelog/next_release/+.bugfix.kinit.rst b/docs/changelog/next_release/+.bugfix.kinit.rst new file mode 100644 index 00000000..45687b1c --- /dev/null +++ b/docs/changelog/next_release/+.bugfix.kinit.rst @@ -0,0 +1 @@ +Wrap ``kinit`` with ``threading.Lock`` to avoid multithreading issues. diff --git a/onetl/connection/kerberos_helpers.py b/onetl/connection/kerberos_helpers.py index b1dd5019..6d8b11d9 100644 --- a/onetl/connection/kerberos_helpers.py +++ b/onetl/connection/kerberos_helpers.py @@ -5,35 +5,39 @@ import os import subprocess from logging import getLogger +from threading import Lock from onetl._util.file import is_file_readable log = getLogger(__name__) +_kinit_lock = Lock() def kinit_keytab(user: str, keytab: str | os.PathLike) -> None: path = is_file_readable(keytab) - cmd = ["kinit", user, "-k", "-t", os.fspath(path)] - log.info("|onETL| Executing kerberos auth command: %s", " ".join(cmd)) - subprocess.check_call(cmd) + with _kinit_lock: + cmd = ["kinit", user, "-k", "-t", os.fspath(path)] + log.info("|onETL| Executing kerberos auth command: %s", " ".join(cmd)) + subprocess.check_call(cmd) def kinit_password(user: str, password: str) -> None: cmd = ["kinit", user] log.info("|onETL| Executing kerberos auth command: %s", " ".join(cmd)) - proc = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - stdin=subprocess.PIPE, - ) - - _stdout, stderr = proc.communicate(password.encode("utf-8")) - exit_code = proc.poll() - if exit_code: - raise subprocess.CalledProcessError(exit_code, cmd, output=stderr) + with _kinit_lock: + proc = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE, + ) + + _stdout, stderr = proc.communicate(password.encode("utf-8")) + exit_code = proc.poll() + if exit_code: + raise subprocess.CalledProcessError(exit_code, cmd, output=stderr) def kinit(user: str, keytab: os.PathLike | None = None, password: str | None = None) -> None: From 45c6bb8ffba3914b21cb5819ac03222a5cdabb75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Tue, 1 Oct 2024 11:16:43 +0000 Subject: [PATCH 19/28] [DOP-16941] Add spark-dialect-extension package to Clickhouse documentation --- .../next_release/{+.bugfix.hdfs.rst => +.bugfix.1.rst} | 0 .../{+.bugfix.kinit.rst => +.bugfix.2.rst} | 0 docs/changelog/next_release/310.doc.rst | 1 + docs/connection/db_connection/clickhouse/types.rst | 10 ++++++++++ docs/connection/db_connection/greenplum/types.rst | 4 ++++ docs/connection/db_connection/mongodb/types.rst | 4 ++++ docs/connection/db_connection/mssql/types.rst | 6 +++++- docs/connection/db_connection/mysql/types.rst | 6 +++++- docs/connection/db_connection/oracle/types.rst | 4 ++++ docs/connection/db_connection/postgres/types.rst | 6 +++++- 10 files changed, 38 insertions(+), 3 deletions(-) rename docs/changelog/next_release/{+.bugfix.hdfs.rst => +.bugfix.1.rst} (100%) rename docs/changelog/next_release/{+.bugfix.kinit.rst => +.bugfix.2.rst} (100%) create mode 100644 docs/changelog/next_release/310.doc.rst diff --git a/docs/changelog/next_release/+.bugfix.hdfs.rst b/docs/changelog/next_release/+.bugfix.1.rst similarity index 100% rename from docs/changelog/next_release/+.bugfix.hdfs.rst rename to docs/changelog/next_release/+.bugfix.1.rst diff --git a/docs/changelog/next_release/+.bugfix.kinit.rst b/docs/changelog/next_release/+.bugfix.2.rst similarity index 100% rename from docs/changelog/next_release/+.bugfix.kinit.rst rename to docs/changelog/next_release/+.bugfix.2.rst diff --git a/docs/changelog/next_release/310.doc.rst b/docs/changelog/next_release/310.doc.rst new file mode 100644 index 00000000..469ccf67 --- /dev/null +++ b/docs/changelog/next_release/310.doc.rst @@ -0,0 +1 @@ +Add note about `spark-dialect-extension `_ package to Clickhouse connector documentation. diff --git a/docs/connection/db_connection/clickhouse/types.rst b/docs/connection/db_connection/clickhouse/types.rst index 6135e066..4f2dbf3f 100644 --- a/docs/connection/db_connection/clickhouse/types.rst +++ b/docs/connection/db_connection/clickhouse/types.rst @@ -3,6 +3,16 @@ Clickhouse <-> Spark type mapping ================================= +.. note:: + + The results below are valid for Spark 3.5.3, and may differ on other Spark versions. + +.. note:: + + It is recommended to use `spark-dialect-extension `_ package, + which implements writing Arrays from Spark to Clickhouse, fixes dropping fractions of seconds in ``TimestampType``, + and fixes other type conversion issues. + Type detection & casting ------------------------ diff --git a/docs/connection/db_connection/greenplum/types.rst b/docs/connection/db_connection/greenplum/types.rst index 3c0e664b..01282159 100644 --- a/docs/connection/db_connection/greenplum/types.rst +++ b/docs/connection/db_connection/greenplum/types.rst @@ -3,6 +3,10 @@ Greenplum <-> Spark type mapping ================================= +.. note:: + + The results below are valid for Spark 3.2.4, and may differ on other Spark versions. + Type detection & casting ------------------------ diff --git a/docs/connection/db_connection/mongodb/types.rst b/docs/connection/db_connection/mongodb/types.rst index f701ac93..91a64a88 100644 --- a/docs/connection/db_connection/mongodb/types.rst +++ b/docs/connection/db_connection/mongodb/types.rst @@ -3,6 +3,10 @@ MongoDB <-> Spark type mapping ============================== +.. note:: + + The results below are valid for Spark 3.5.3, and may differ on other Spark versions. + Type detection & casting ------------------------ diff --git a/docs/connection/db_connection/mssql/types.rst b/docs/connection/db_connection/mssql/types.rst index 3a8b2d36..13c7874a 100644 --- a/docs/connection/db_connection/mssql/types.rst +++ b/docs/connection/db_connection/mssql/types.rst @@ -1,7 +1,11 @@ .. _mssql-types: MSSQL <-> Spark type mapping -================================= +============================ + +.. note:: + + The results below are valid for Spark 3.5.3, and may differ on other Spark versions. Type detection & casting ------------------------ diff --git a/docs/connection/db_connection/mysql/types.rst b/docs/connection/db_connection/mysql/types.rst index 71795925..a828cb41 100644 --- a/docs/connection/db_connection/mysql/types.rst +++ b/docs/connection/db_connection/mysql/types.rst @@ -1,7 +1,11 @@ .. _mysql-types: MySQL <-> Spark type mapping -================================= +============================ + +.. note:: + + The results below are valid for Spark 3.5.3, and may differ on other Spark versions. Type detection & casting ------------------------ diff --git a/docs/connection/db_connection/oracle/types.rst b/docs/connection/db_connection/oracle/types.rst index 28ece909..b5277660 100644 --- a/docs/connection/db_connection/oracle/types.rst +++ b/docs/connection/db_connection/oracle/types.rst @@ -3,6 +3,10 @@ Oracle <-> Spark type mapping ============================= +.. note:: + + The results below are valid for Spark 3.5.3, and may differ on other Spark versions. + Type detection & casting ------------------------ diff --git a/docs/connection/db_connection/postgres/types.rst b/docs/connection/db_connection/postgres/types.rst index e47336f0..7eea2850 100644 --- a/docs/connection/db_connection/postgres/types.rst +++ b/docs/connection/db_connection/postgres/types.rst @@ -1,7 +1,11 @@ .. _postgres-types: Postgres <-> Spark type mapping -================================= +=============================== + +.. note:: + + The results below are valid for Spark 3.5.3, and may differ on other Spark versions. Type detection & casting ------------------------ From 97779a1b2ad364c6a4fcdb9ad45d28c6affe7a61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Wed, 2 Oct 2024 13:15:10 +0000 Subject: [PATCH 20/28] [DOP-16999] Log detected JDBC dialect while using DBWriter --- docs/changelog/next_release/+.feature.1.rst | 1 + onetl/connection/db_connection/jdbc_connection/connection.py | 1 + 2 files changed, 2 insertions(+) create mode 100644 docs/changelog/next_release/+.feature.1.rst diff --git a/docs/changelog/next_release/+.feature.1.rst b/docs/changelog/next_release/+.feature.1.rst new file mode 100644 index 00000000..e48a1038 --- /dev/null +++ b/docs/changelog/next_release/+.feature.1.rst @@ -0,0 +1 @@ +Log detected JDBC dialect while using ``DBWriter``. diff --git a/onetl/connection/db_connection/jdbc_connection/connection.py b/onetl/connection/db_connection/jdbc_connection/connection.py index 0ea3078c..5bb5811c 100644 --- a/onetl/connection/db_connection/jdbc_connection/connection.py +++ b/onetl/connection/db_connection/jdbc_connection/connection.py @@ -187,6 +187,7 @@ def write_df_to_target( else write_options.if_exists.value ) log.info("|%s| Saving data to a table %r", self.__class__.__name__, target) + log.info("|%s| Detected dialect: '%s'", self.__class__.__name__, self._get_spark_dialect_name()) df.write.format("jdbc").mode(mode).options(dbtable=target, **jdbc_properties).save() log.info("|%s| Table %r successfully written", self.__class__.__name__, target) From dd89c4c35ffd0728845452f2351d2a27ac760b1e Mon Sep 17 00:00:00 2001 From: Maxim Liksakov <67663774+maxim-lixakov@users.noreply.github.com> Date: Mon, 7 Oct 2024 10:57:18 +0300 Subject: [PATCH 21/28] Update prerequisites.rst --- docs/connection/db_connection/greenplum/prerequisites.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/connection/db_connection/greenplum/prerequisites.rst b/docs/connection/db_connection/greenplum/prerequisites.rst index b3cf52d7..545b8399 100644 --- a/docs/connection/db_connection/greenplum/prerequisites.rst +++ b/docs/connection/db_connection/greenplum/prerequisites.rst @@ -93,7 +93,7 @@ Number of connections can be limited by 2 ways: spark = ( SparkSession.builder - # Spark will start EXACTLY 5 executors with 1 core each, so max number of parallel jobs is 10 + # Spark will run with 5 threads in local mode, allowing up to 5 parallel tasks .config("spark.master", "local[5]") .config("spark.executor.cores", 1) ).getOrCreate() From 8cb94537e98a62f5f4aa63fbf7a5695557ffdedb Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 7 Oct 2024 22:59:35 +0000 Subject: [PATCH 22/28] [pre-commit.ci] pre-commit autoupdate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/pre-commit/pre-commit-hooks: v4.6.0 → v5.0.0](https://github.com/pre-commit/pre-commit-hooks/compare/v4.6.0...v5.0.0) - [github.com/psf/black: 24.8.0 → 24.10.0](https://github.com/psf/black/compare/24.8.0...24.10.0) - [github.com/asottile/blacken-docs: 1.18.0 → 1.19.0](https://github.com/asottile/blacken-docs/compare/1.18.0...1.19.0) --- .pre-commit-config.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index af00fcc4..7455ef11 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -3,7 +3,7 @@ default_language_version: repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.6.0 + rev: v5.0.0 hooks: - id: check-ast - id: check-case-conflict @@ -101,13 +101,13 @@ repos: - id: add-trailing-comma - repo: https://github.com/psf/black - rev: 24.8.0 + rev: 24.10.0 hooks: - id: black language_version: python3 - repo: https://github.com/asottile/blacken-docs - rev: 1.18.0 + rev: 1.19.0 hooks: - id: blacken-docs additional_dependencies: From c18a4f233c1ea417dae011a6e62277a4ef1f4efa Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 14 Oct 2024 21:48:19 +0000 Subject: [PATCH 23/28] [pre-commit.ci] pre-commit autoupdate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/asottile/pyupgrade: v3.17.0 → v3.18.0](https://github.com/asottile/pyupgrade/compare/v3.17.0...v3.18.0) --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 7455ef11..6d2bb684 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -90,7 +90,7 @@ repos: - id: text-unicode-replacement-char - repo: https://github.com/asottile/pyupgrade - rev: v3.17.0 + rev: v3.18.0 hooks: - id: pyupgrade args: [--py37-plus, --keep-runtime-typing] From a97e63bb1ed189b5d682d03d88492e7823aa3734 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Thu, 17 Oct 2024 09:28:15 +0000 Subject: [PATCH 24/28] [DOP-20817] Handle case taskMetrics=null --- docs/changelog/next_release/313.bugfix.rst | 1 + onetl/_metrics/listener/task.py | 2 + .../test_spark_metrics_recorder_file_df.py | 59 ++++++++++++++++++ .../test_spark_metrics_recorder_hive.py | 48 ++++++++++++++ .../test_spark_metrics_recorder_postgres.py | 62 +++++++++++++++++++ 5 files changed, 172 insertions(+) create mode 100644 docs/changelog/next_release/313.bugfix.rst diff --git a/docs/changelog/next_release/313.bugfix.rst b/docs/changelog/next_release/313.bugfix.rst new file mode 100644 index 00000000..5647f762 --- /dev/null +++ b/docs/changelog/next_release/313.bugfix.rst @@ -0,0 +1 @@ +Fix ``SparkMetricsRecorder`` failing when receiving ``SparkListenerTaskEnd`` without ``taskMetrics`` (e.g. executor was killed by OOM). diff --git a/onetl/_metrics/listener/task.py b/onetl/_metrics/listener/task.py index aecc45fe..f979dbaa 100644 --- a/onetl/_metrics/listener/task.py +++ b/onetl/_metrics/listener/task.py @@ -61,6 +61,8 @@ class SparkListenerTaskMetrics: @classmethod def create(cls, task_metrics): + if not task_metrics: + return cls() return cls( executor_run_time_milliseconds=task_metrics.executorRunTime(), executor_cpu_time_nanoseconds=task_metrics.executorCpuTime(), diff --git a/tests/tests_integration/test_metrics/test_spark_metrics_recorder_file_df.py b/tests/tests_integration/test_metrics/test_spark_metrics_recorder_file_df.py index f59acf89..14d88250 100644 --- a/tests/tests_integration/test_metrics/test_spark_metrics_recorder_file_df.py +++ b/tests/tests_integration/test_metrics/test_spark_metrics_recorder_file_df.py @@ -169,3 +169,62 @@ def test_spark_metrics_recorder_file_df_writer_empty_input( metrics = recorder.metrics() assert not metrics.output.written_rows assert not metrics.output.written_bytes + + +def test_spark_metrics_recorder_file_df_writer_driver_failed( + spark, + local_fs_file_df_connection_with_path, + file_df_dataframe, +): + local_fs, target_path = local_fs_file_df_connection_with_path + + df = file_df_dataframe + + writer = FileDFWriter( + connection=local_fs, + format=CSV(), + target_path=target_path, + options=FileDFWriter.Options(if_exists="error"), + ) + + with SparkMetricsRecorder(spark) as recorder: + with suppress(Exception): + writer.run(df) + + time.sleep(0.1) # sleep to fetch late metrics from SparkListener + metrics = recorder.metrics() + assert not metrics.output.written_rows + assert not metrics.output.written_bytes + + +def test_spark_metrics_recorder_file_df_writer_executor_failed( + spark, + local_fs_file_df_connection_with_path, + file_df_dataframe, +): + from pyspark.sql.functions import udf + from pyspark.sql.types import IntegerType + + @udf(returnType=IntegerType()) + def raise_exception(): + raise ValueError("Force task failure") + + local_fs, target_path = local_fs_file_df_connection_with_path + + failing_df = file_df_dataframe.select(raise_exception().alias("some")) + + writer = FileDFWriter( + connection=local_fs, + format=CSV(), + target_path=target_path, + options=FileDFWriter.Options(if_exists="append"), + ) + + with SparkMetricsRecorder(spark) as recorder: + with suppress(Exception): + writer.run(failing_df) + + time.sleep(0.1) # sleep to fetch late metrics from SparkListener + metrics = recorder.metrics() + assert not metrics.output.written_rows + assert not metrics.output.written_bytes diff --git a/tests/tests_integration/test_metrics/test_spark_metrics_recorder_hive.py b/tests/tests_integration/test_metrics/test_spark_metrics_recorder_hive.py index 7e8dc218..79116a3e 100644 --- a/tests/tests_integration/test_metrics/test_spark_metrics_recorder_hive.py +++ b/tests/tests_integration/test_metrics/test_spark_metrics_recorder_hive.py @@ -1,4 +1,5 @@ import time +from contextlib import suppress import pytest @@ -137,6 +138,53 @@ def test_spark_metrics_recorder_hive_write_empty(spark, processing, get_schema_t assert not metrics.output.written_rows +def test_spark_metrics_recorder_hive_write_driver_failed(spark, processing, prepare_schema_table): + df = processing.create_spark_df(spark).limit(0) + + mismatch_df = df.withColumn("mismatch", df.id_int) + + hive = Hive(cluster="rnd-dwh", spark=spark) + writer = DBWriter( + connection=hive, + target=prepare_schema_table.full_name, + ) + + with SparkMetricsRecorder(spark) as recorder: + with suppress(Exception): + writer.run(mismatch_df) + + time.sleep(0.1) # sleep to fetch late metrics from SparkListener + metrics = recorder.metrics() + assert not metrics.output.written_rows + + +def test_spark_metrics_recorder_hive_write_executor_failed(spark, processing, get_schema_table): + from pyspark.sql.functions import udf + from pyspark.sql.types import IntegerType + + df = processing.create_spark_df(spark).limit(0) + + @udf(returnType=IntegerType()) + def raise_exception(): + raise ValueError("Force task failure") + + failing_df = df.select(raise_exception().alias("some")) + + hive = Hive(cluster="rnd-dwh", spark=spark) + writer = DBWriter( + connection=hive, + target=get_schema_table.full_name, + ) + + with SparkMetricsRecorder(spark) as recorder: + with suppress(Exception): + writer.run(failing_df) + + time.sleep(0.1) # sleep to fetch late metrics from SparkListener + metrics = recorder.metrics() + assert not metrics.output.written_rows + + def test_spark_metrics_recorder_hive_execute(request, spark, processing, get_schema_table): df = processing.create_spark_df(spark) view_name = rand_str() diff --git a/tests/tests_integration/test_metrics/test_spark_metrics_recorder_postgres.py b/tests/tests_integration/test_metrics/test_spark_metrics_recorder_postgres.py index 67e31591..aa391eb8 100644 --- a/tests/tests_integration/test_metrics/test_spark_metrics_recorder_postgres.py +++ b/tests/tests_integration/test_metrics/test_spark_metrics_recorder_postgres.py @@ -1,4 +1,5 @@ import time +from contextlib import suppress import pytest @@ -167,6 +168,67 @@ def test_spark_metrics_recorder_postgres_write_empty(spark, processing, get_sche assert not metrics.output.written_rows +def test_spark_metrics_recorder_postgres_write_driver_failed(spark, processing, prepare_schema_table): + postgres = Postgres( + host=processing.host, + port=processing.port, + user=processing.user, + password=processing.password, + database=processing.database, + spark=spark, + ) + df = processing.create_spark_df(spark).limit(0) + + mismatch_df = df.withColumn("mismatch", df.id_int) + + writer = DBWriter( + connection=postgres, + target=prepare_schema_table.full_name, + ) + + with SparkMetricsRecorder(spark) as recorder: + with suppress(Exception): + writer.run(mismatch_df) + + time.sleep(0.1) # sleep to fetch late metrics from SparkListener + metrics = recorder.metrics() + assert not metrics.output.written_rows + + +def test_spark_metrics_recorder_postgres_write_executor_failed(spark, processing, get_schema_table): + from pyspark.sql.functions import udf + from pyspark.sql.types import IntegerType + + postgres = Postgres( + host=processing.host, + port=processing.port, + user=processing.user, + password=processing.password, + database=processing.database, + spark=spark, + ) + + @udf(returnType=IntegerType()) + def raise_exception(): + raise ValueError("Force task failure") + + df = processing.create_spark_df(spark).limit(0) + failing_df = df.select(raise_exception().alias("some")) + + writer = DBWriter( + connection=postgres, + target=get_schema_table.full_name, + ) + + with SparkMetricsRecorder(spark) as recorder: + with suppress(Exception): + writer.run(failing_df) + + time.sleep(0.1) # sleep to fetch late metrics from SparkListener + metrics = recorder.metrics() + assert not metrics.output.written_rows + + def test_spark_metrics_recorder_postgres_fetch(spark, processing, load_table_data): postgres = Postgres( host=processing.host, From 6b29329b2d2bddfa721c3c47e4ca52eb0855668e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Thu, 17 Oct 2024 09:54:24 +0000 Subject: [PATCH 25/28] [DOP-20674] Replace ImportError with AttributeError in __getattr__ --- docs/changelog/next_release/+.bugfix.3.rst | 1 + onetl/connection/__init__.py | 2 +- onetl/core/__init__.py | 2 +- onetl/hwm/store/__init__.py | 2 +- onetl/strategy/hwm_store/__init__.py | 2 +- 5 files changed, 5 insertions(+), 4 deletions(-) create mode 100644 docs/changelog/next_release/+.bugfix.3.rst diff --git a/docs/changelog/next_release/+.bugfix.3.rst b/docs/changelog/next_release/+.bugfix.3.rst new file mode 100644 index 00000000..f8f555e6 --- /dev/null +++ b/docs/changelog/next_release/+.bugfix.3.rst @@ -0,0 +1 @@ +Use ``AttributeError`` instead of ``ImportError`` in module's ``__getattr__`` method, to make code compliant with Python spec. diff --git a/onetl/connection/__init__.py b/onetl/connection/__init__.py index 608beb41..03402fd0 100644 --- a/onetl/connection/__init__.py +++ b/onetl/connection/__init__.py @@ -76,4 +76,4 @@ def __getattr__(name: str): submodule = file_df_connections_modules[name] return getattr(import_module(f"onetl.connection.file_df_connection.{submodule}"), name) - raise ImportError(f"cannot import name {name!r} from {__name__!r}") + raise AttributeError(name) diff --git a/onetl/core/__init__.py b/onetl/core/__init__.py index 9796b173..abbfd0e3 100644 --- a/onetl/core/__init__.py +++ b/onetl/core/__init__.py @@ -40,4 +40,4 @@ def __getattr__(name: str): ) return getattr(import_module(f"onetl.{submodule}"), name) - raise ImportError(f"cannot import name {name!r} from {__name__!r}") + raise AttributeError(name) diff --git a/onetl/hwm/store/__init__.py b/onetl/hwm/store/__init__.py index 4fb2d991..929b1d40 100644 --- a/onetl/hwm/store/__init__.py +++ b/onetl/hwm/store/__init__.py @@ -45,4 +45,4 @@ def __getattr__(name: str): return getattr(import_module("etl_entities.hwm_store"), name) - raise ImportError(f"cannot import name {name!r} from {__name__!r}") + raise AttributeError(name) diff --git a/onetl/strategy/hwm_store/__init__.py b/onetl/strategy/hwm_store/__init__.py index de994c20..2f89a595 100644 --- a/onetl/strategy/hwm_store/__init__.py +++ b/onetl/strategy/hwm_store/__init__.py @@ -39,7 +39,7 @@ def __getattr__(name: str): if name not in __all__: - raise ImportError(f"cannot import name {name!r} from {__name__!r}") + raise AttributeError(name) message = f""" Imports from module {__name__!r} are deprecated since v0.6.0 and will be removed in v1.0.0. From 00aa722ead59983604ce6ec251e60fcf61f3e2df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Thu, 24 Oct 2024 08:48:10 +0000 Subject: [PATCH 26/28] [DOP-20393] Do not hide kinit error messages from user --- docs/changelog/next_release/+.bugfix.4.rst | 1 + onetl/connection/kerberos_helpers.py | 18 +++++++++--------- 2 files changed, 10 insertions(+), 9 deletions(-) create mode 100644 docs/changelog/next_release/+.bugfix.4.rst diff --git a/docs/changelog/next_release/+.bugfix.4.rst b/docs/changelog/next_release/+.bugfix.4.rst new file mode 100644 index 00000000..2cde0404 --- /dev/null +++ b/docs/changelog/next_release/+.bugfix.4.rst @@ -0,0 +1 @@ +Immediately show ``kinit`` errors to user, instead of hiding them. diff --git a/onetl/connection/kerberos_helpers.py b/onetl/connection/kerberos_helpers.py index 6d8b11d9..72f151ee 100644 --- a/onetl/connection/kerberos_helpers.py +++ b/onetl/connection/kerberos_helpers.py @@ -27,17 +27,17 @@ def kinit_password(user: str, password: str) -> None: log.info("|onETL| Executing kerberos auth command: %s", " ".join(cmd)) with _kinit_lock: - proc = subprocess.Popen( + with subprocess.Popen( cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, stdin=subprocess.PIPE, - ) - - _stdout, stderr = proc.communicate(password.encode("utf-8")) - exit_code = proc.poll() - if exit_code: - raise subprocess.CalledProcessError(exit_code, cmd, output=stderr) + # do not show user 'Please enter password' banner + stderr=subprocess.PIPE, + # do not capture stderr, immediately show all errors to user + ) as proc: + proc.communicate(password.encode("utf-8")) + exit_code = proc.poll() + if exit_code: + raise subprocess.CalledProcessError(exit_code, cmd) def kinit(user: str, keytab: os.PathLike | None = None, password: str | None = None) -> None: From 652362075bdd275016bd52e9ec17045a80082009 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Fri, 25 Oct 2024 09:01:43 +0000 Subject: [PATCH 27/28] [DOP-20393] Do not hide kinit error messages from user --- onetl/connection/kerberos_helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/onetl/connection/kerberos_helpers.py b/onetl/connection/kerberos_helpers.py index 72f151ee..d0b3c0aa 100644 --- a/onetl/connection/kerberos_helpers.py +++ b/onetl/connection/kerberos_helpers.py @@ -31,7 +31,7 @@ def kinit_password(user: str, password: str) -> None: cmd, stdin=subprocess.PIPE, # do not show user 'Please enter password' banner - stderr=subprocess.PIPE, + stdout=subprocess.PIPE, # do not capture stderr, immediately show all errors to user ) as proc: proc.communicate(password.encode("utf-8")) From 493f8afdae019122c750012fc9a872c2bf79ccdc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Fri, 25 Oct 2024 14:00:22 +0000 Subject: [PATCH 28/28] [DOP-20393] Prepare for release --- docs/changelog/0.12.1.rst | 23 +++++++++++++++++++++ docs/changelog/index.rst | 1 + docs/changelog/next_release/+.bugfix.1.rst | 1 - docs/changelog/next_release/+.bugfix.2.rst | 1 - docs/changelog/next_release/+.bugfix.3.rst | 1 - docs/changelog/next_release/+.bugfix.4.rst | 1 - docs/changelog/next_release/+.feature.1.rst | 1 - docs/changelog/next_release/310.doc.rst | 1 - docs/changelog/next_release/313.bugfix.rst | 1 - 9 files changed, 24 insertions(+), 7 deletions(-) create mode 100644 docs/changelog/0.12.1.rst delete mode 100644 docs/changelog/next_release/+.bugfix.1.rst delete mode 100644 docs/changelog/next_release/+.bugfix.2.rst delete mode 100644 docs/changelog/next_release/+.bugfix.3.rst delete mode 100644 docs/changelog/next_release/+.bugfix.4.rst delete mode 100644 docs/changelog/next_release/+.feature.1.rst delete mode 100644 docs/changelog/next_release/310.doc.rst delete mode 100644 docs/changelog/next_release/313.bugfix.rst diff --git a/docs/changelog/0.12.1.rst b/docs/changelog/0.12.1.rst new file mode 100644 index 00000000..953cf3e1 --- /dev/null +++ b/docs/changelog/0.12.1.rst @@ -0,0 +1,23 @@ +0.12.1 (2024-10-28) +=================== + +Features +-------- + +- Log detected JDBC dialect while using ``DBWriter``. + + +Bug Fixes +--------- + +- Fix ``SparkMetricsRecorder`` failing when receiving ``SparkListenerTaskEnd`` without ``taskMetrics`` (e.g. executor was killed by OOM). (:github:pull:`313`) +- Call ``kinit`` before checking for HDFS active namenode. +- Wrap ``kinit`` with ``threading.Lock`` to avoid multithreading issues. +- Immediately show ``kinit`` errors to user, instead of hiding them. +- Use ``AttributeError`` instead of ``ImportError`` in module's ``__getattr__`` method, to make code compliant with Python spec. + + +Doc only Changes +---------------- + +- Add note about `spark-dialect-extension `_ package to Clickhouse connector documentation. (:github:pull:`310`) diff --git a/docs/changelog/index.rst b/docs/changelog/index.rst index 756f0cb1..812c5437 100644 --- a/docs/changelog/index.rst +++ b/docs/changelog/index.rst @@ -3,6 +3,7 @@ :caption: Changelog DRAFT + 0.12.1 0.12.0 0.11.2 0.11.1 diff --git a/docs/changelog/next_release/+.bugfix.1.rst b/docs/changelog/next_release/+.bugfix.1.rst deleted file mode 100644 index ed35c63e..00000000 --- a/docs/changelog/next_release/+.bugfix.1.rst +++ /dev/null @@ -1 +0,0 @@ -Call ``kinit`` before checking for HDFS active namenode. diff --git a/docs/changelog/next_release/+.bugfix.2.rst b/docs/changelog/next_release/+.bugfix.2.rst deleted file mode 100644 index 45687b1c..00000000 --- a/docs/changelog/next_release/+.bugfix.2.rst +++ /dev/null @@ -1 +0,0 @@ -Wrap ``kinit`` with ``threading.Lock`` to avoid multithreading issues. diff --git a/docs/changelog/next_release/+.bugfix.3.rst b/docs/changelog/next_release/+.bugfix.3.rst deleted file mode 100644 index f8f555e6..00000000 --- a/docs/changelog/next_release/+.bugfix.3.rst +++ /dev/null @@ -1 +0,0 @@ -Use ``AttributeError`` instead of ``ImportError`` in module's ``__getattr__`` method, to make code compliant with Python spec. diff --git a/docs/changelog/next_release/+.bugfix.4.rst b/docs/changelog/next_release/+.bugfix.4.rst deleted file mode 100644 index 2cde0404..00000000 --- a/docs/changelog/next_release/+.bugfix.4.rst +++ /dev/null @@ -1 +0,0 @@ -Immediately show ``kinit`` errors to user, instead of hiding them. diff --git a/docs/changelog/next_release/+.feature.1.rst b/docs/changelog/next_release/+.feature.1.rst deleted file mode 100644 index e48a1038..00000000 --- a/docs/changelog/next_release/+.feature.1.rst +++ /dev/null @@ -1 +0,0 @@ -Log detected JDBC dialect while using ``DBWriter``. diff --git a/docs/changelog/next_release/310.doc.rst b/docs/changelog/next_release/310.doc.rst deleted file mode 100644 index 469ccf67..00000000 --- a/docs/changelog/next_release/310.doc.rst +++ /dev/null @@ -1 +0,0 @@ -Add note about `spark-dialect-extension `_ package to Clickhouse connector documentation. diff --git a/docs/changelog/next_release/313.bugfix.rst b/docs/changelog/next_release/313.bugfix.rst deleted file mode 100644 index 5647f762..00000000 --- a/docs/changelog/next_release/313.bugfix.rst +++ /dev/null @@ -1 +0,0 @@ -Fix ``SparkMetricsRecorder`` failing when receiving ``SparkListenerTaskEnd`` without ``taskMetrics`` (e.g. executor was killed by OOM).