From e3ee8a441a314f6d4cec9859a7757f9d6565ff8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Mon, 30 Sep 2024 07:38:00 +0000 Subject: [PATCH] [DOP-20393] Remove lock for HDFS active namenode check --- onetl/connection/file_connection/hdfs/connection.py | 8 ++------ .../file_df_connection/spark_hdfs/connection.py | 8 ++------ 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/onetl/connection/file_connection/hdfs/connection.py b/onetl/connection/file_connection/hdfs/connection.py index f64d4fad..74eae72d 100644 --- a/onetl/connection/file_connection/hdfs/connection.py +++ b/onetl/connection/file_connection/hdfs/connection.py @@ -7,7 +7,6 @@ import textwrap from contextlib import suppress from logging import getLogger -from threading import Lock from typing import TYPE_CHECKING, Optional, Tuple from etl_entities.instance import Cluster, Host @@ -221,7 +220,6 @@ class HDFS(FileConnection, RenameDirMixin): # TODO: remove in v1.0.0 slots = Slots - _active_host_lock: Lock = PrivateAttr(default_factory=Lock) _active_host: Optional[Host] = PrivateAttr(default=None) @slot @@ -434,10 +432,8 @@ def _get_host(self) -> str: def _get_conn_str(self) -> str: # cache active host to reduce number of requests. - # acquire a lock to avoid sending the same request for each thread. - with self._active_host_lock: - if not self._active_host: - self._active_host = self._get_host() + if not self._active_host: + self._active_host = self._get_host() return f"http://{self._active_host}:{self.webhdfs_port}" def _get_client(self) -> Client: diff --git a/onetl/connection/file_df_connection/spark_hdfs/connection.py b/onetl/connection/file_df_connection/spark_hdfs/connection.py index 4de806de..12f17504 100644 --- a/onetl/connection/file_df_connection/spark_hdfs/connection.py +++ b/onetl/connection/file_df_connection/spark_hdfs/connection.py @@ -7,7 +7,6 @@ import os from contextlib import suppress from pathlib import Path -from threading import Lock from typing import TYPE_CHECKING, Optional from etl_entities.instance import Cluster, Host @@ -155,7 +154,6 @@ class SparkHDFS(SparkFileDFConnection): host: Optional[Host] = None ipc_port: int = Field(default=8020, alias="port") - _active_host_lock: Lock = PrivateAttr(default_factory=Lock) _active_host: Optional[Host] = PrivateAttr(default=None) @slot @@ -345,10 +343,8 @@ def _get_host(self) -> str: def _get_conn_str(self) -> str: # cache active host to reduce number of requests. - # acquire a lock to avoid sending the same request for each thread. - with self._active_host_lock: - if not self._active_host: - self._active_host = self._get_host() + if not self._active_host: + self._active_host = self._get_host() return f"hdfs://{self._active_host}:{self.ipc_port}" def _convert_to_url(self, path: PurePathProtocol) -> str: