Skip to content

Commit

Permalink
[DOP-20393] Remove lock for HDFS active namenode check
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed Sep 30, 2024
1 parent 23e5926 commit e3ee8a4
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 12 deletions.
8 changes: 2 additions & 6 deletions onetl/connection/file_connection/hdfs/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import textwrap
from contextlib import suppress
from logging import getLogger
from threading import Lock
from typing import TYPE_CHECKING, Optional, Tuple

from etl_entities.instance import Cluster, Host
Expand Down Expand Up @@ -221,7 +220,6 @@ class HDFS(FileConnection, RenameDirMixin):
# TODO: remove in v1.0.0
slots = Slots

_active_host_lock: Lock = PrivateAttr(default_factory=Lock)
_active_host: Optional[Host] = PrivateAttr(default=None)

@slot
Expand Down Expand Up @@ -434,10 +432,8 @@ def _get_host(self) -> str:

def _get_conn_str(self) -> str:
# cache active host to reduce number of requests.
# acquire a lock to avoid sending the same request for each thread.
with self._active_host_lock:
if not self._active_host:
self._active_host = self._get_host()
if not self._active_host:
self._active_host = self._get_host()
return f"http://{self._active_host}:{self.webhdfs_port}"

def _get_client(self) -> Client:
Expand Down
8 changes: 2 additions & 6 deletions onetl/connection/file_df_connection/spark_hdfs/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import os
from contextlib import suppress
from pathlib import Path
from threading import Lock
from typing import TYPE_CHECKING, Optional

from etl_entities.instance import Cluster, Host
Expand Down Expand Up @@ -155,7 +154,6 @@ class SparkHDFS(SparkFileDFConnection):
host: Optional[Host] = None
ipc_port: int = Field(default=8020, alias="port")

_active_host_lock: Lock = PrivateAttr(default_factory=Lock)
_active_host: Optional[Host] = PrivateAttr(default=None)

@slot
Expand Down Expand Up @@ -345,10 +343,8 @@ def _get_host(self) -> str:

def _get_conn_str(self) -> str:
# cache active host to reduce number of requests.
# acquire a lock to avoid sending the same request for each thread.
with self._active_host_lock:
if not self._active_host:
self._active_host = self._get_host()
if not self._active_host:
self._active_host = self._get_host()
return f"hdfs://{self._active_host}:{self.ipc_port}"

def _convert_to_url(self, path: PurePathProtocol) -> str:
Expand Down

0 comments on commit e3ee8a4

Please sign in to comment.