From d876d9062dc56812f507c5415cb1ccf32654dc13 Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Mon, 18 Sep 2023 13:57:46 +0300 Subject: [PATCH 01/16] [DOP-8837] - add draft Samba connection class --- .env.dependencies | 8 + .env.docker | 8 + .env.local | 8 + conftest.py | 1 + docker-compose.yml | 12 + onetl/connection/__init__.py | 2 + onetl/connection/file_connection/samba.py | 251 ++++++++++++++++++ requirements/samba.txt | 1 + requirements/tests/samba.txt | 1 + setup.py | 2 + .../fixtures/connections/file_connections.py | 1 + tests/fixtures/connections/samba.py | 63 +++++ .../test_samba_file_connection_integration.py | 39 +++ 13 files changed, 397 insertions(+) create mode 100644 onetl/connection/file_connection/samba.py create mode 100644 requirements/samba.txt create mode 100644 requirements/tests/samba.txt create mode 100644 tests/fixtures/connections/samba.py create mode 100644 tests/tests_integration/tests_file_connection_integration/test_samba_file_connection_integration.py diff --git a/.env.dependencies b/.env.dependencies index af892898b..35f6b03ba 100644 --- a/.env.dependencies +++ b/.env.dependencies @@ -62,6 +62,14 @@ PASSWORD_ACCESS=true SUDO_ACCESS=true USER_PASSWORD=AesujeifohgoaCu0Boosiet5aimeitho +# Samba +ONETL_SAMBA_HOST=localhost +ONETL_SAMBA_PROTOCOL=SMB +ONETL_SAMBA_SHARE=445 +ONETL_SAMBA_SHARE=SmbShare +ONETL_SAMBA_USER=onetl +ONETL_SAMBA_PASSWORD=awd123fd1 + # WebDAV, FTP, FTPS APP_USER_NAME=onetl APP_USER_PASSWD=awd123fd1 diff --git a/.env.docker b/.env.docker index b9c2105aa..55fffbf9e 100644 --- a/.env.docker +++ b/.env.docker @@ -87,6 +87,14 @@ ONETL_SFTP_PORT=2222 ONETL_SFTP_USER=onetl ONETL_SFTP_PASSWORD=AesujeifohgoaCu0Boosiet5aimeitho +# Samba +ONETL_SAMBA_HOST=localhost +ONETL_SAMBA_PROTOCOL=SMB +ONETL_SAMBA_SHARE=445 +ONETL_SAMBA_SHARE=SmbShare +ONETL_SAMBA_USER=onetl +ONETL_SAMBA_PASSWORD=awd123fd1 + # Webdav ONETL_WEBDAV_HOST=webdav ONETL_WEBDAV_PORT=80 diff --git a/.env.local b/.env.local index af2551dbd..874004b59 100644 --- a/.env.local +++ b/.env.local @@ -87,6 +87,14 @@ export ONETL_SFTP_PORT=2222 export ONETL_SFTP_USER=onetl export ONETL_SFTP_PASSWORD=AesujeifohgoaCu0Boosiet5aimeitho +# Samba +export ONETL_SAMBA_HOST=localhost +export ONETL_SAMBA_PROTOCOL=SMB +export ONETL_SAMBA_SHARE=445 +export ONETL_SAMBA_SHARE=SmbShare +export ONETL_SAMBA_USER=onetl +export ONETL_SAMBA_PASSWORD=awd123fd1 + # Webdav export ONETL_WEBDAV_HOST=localhost export ONETL_WEBDAV_PORT=8000 diff --git a/conftest.py b/conftest.py index ab0b60a5c..52b6c5754 100644 --- a/conftest.py +++ b/conftest.py @@ -19,5 +19,6 @@ "tests.fixtures.connections.local_fs", "tests.fixtures.connections.s3", "tests.fixtures.connections.sftp", + "tests.fixtures.connections.samba", "tests.fixtures.connections.webdav", ] diff --git a/docker-compose.yml b/docker-compose.yml index a08d8fc38..4439e6319 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -164,6 +164,18 @@ services: networks: - onetl + samba: + image: elswork/samba + ports: + - "139:139" + - "445:445" + restart: unless-stopped + volumes: + - /path/to/share:/share/folder + command: '-u "1000:1000:onetl:d:awd123fd1" -s "SmbShare:/share/folder:rw:onetl"' + networks: + - onetl + s3: image: ${S3_IMAGE:-bitnami/minio:latest} restart: unless-stopped diff --git a/onetl/connection/__init__.py b/onetl/connection/__init__.py index 1c50f7fee..3e40e2a2a 100644 --- a/onetl/connection/__init__.py +++ b/onetl/connection/__init__.py @@ -37,6 +37,7 @@ from onetl.connection.file_connection.ftps import FTPS from onetl.connection.file_connection.hdfs import HDFS from onetl.connection.file_connection.s3 import S3 + from onetl.connection.file_connection.samba import Samba from onetl.connection.file_connection.sftp import SFTP from onetl.connection.file_connection.webdav import WebDAV from onetl.connection.file_df_connection.spark_hdfs import SparkHDFS @@ -62,6 +63,7 @@ "HDFS": "hdfs", "S3": "s3", "SFTP": "sftp", + "Samba": "samba", "WebDAV": "webdav", } diff --git a/onetl/connection/file_connection/samba.py b/onetl/connection/file_connection/samba.py new file mode 100644 index 000000000..585153e27 --- /dev/null +++ b/onetl/connection/file_connection/samba.py @@ -0,0 +1,251 @@ +# Copyright 2023 MTS (Mobile Telesystems) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import os +import stat +import textwrap +from io import BytesIO +from logging import getLogger +from typing import Optional + +from etl_entities.instance import Host +from pydantic import SecretStr, validator + +from onetl.connection.file_connection.file_connection import FileConnection +from onetl.hooks import slot, support_hooks +from onetl.impl import LocalPath, RemotePath, RemotePathStat + +try: + from smb.base import NotConnectedError + from smb.smb_structs import OperationFailure + from smb.SMBConnection import SMBConnection +except (ImportError, NameError) as e: + raise ImportError( + textwrap.dedent( + """ + Cannot import module "pysmb". + + Since onETL v0.7.0 you should install package as follows: + pip install onetl[samba] + + or + pip install onetl[files] + """, + ).strip(), + ) from e + + +log = getLogger(__name__) + + +@support_hooks +class Samba(FileConnection): + """Samba file connection. |support_hooks| + + Based on `pysmb library `_. + + Parameters + ---------- + host : str + Host of Samba source. For example: ``msk.mts.ru``. This is a required field. + + share : str + The name of the share on the Samba server. This is a required field. + + protocol : str, default: ``SMB`` + The protocol to use for the connection. Either ``SMB`` or ``NetBIOS``. + Affects the default port and the `is_direct_tcp` flag in `SMBConnection`. + + port : int, default: None + Port of Samba source. Can be overridden. + + domain : str, default: `` + Domain name for the Samba connection. Defaults to the same as `host`. + + auth_type : str, default: ``NTLMv2`` + The authentication type to use. Either ``NTLMv2`` or ``NTLMv1``. + Affects the `use_ntlm_v2` flag in `SMBConnection`. + + user : str, default: None + User, which have access to the file source. Can be `None` for anonymous connection. + + password : str, default: None + Password for file source connection. Can be `None` for anonymous connection. + + timeout : int, default: ``10`` + How long to wait for the server to send data before giving up. + """ + + host: Host + share: str + protocol: str = "SMB" + port: Optional[int] = None + domain: Optional[str] = "" + auth_type: str = "NTLMv2" + user: Optional[str] = None + password: Optional[SecretStr] = None + timeout: int = 10 + + @property + def instance_url(self) -> str: + return f"smb://{self.host}:{self.port}" + + @validator("port", pre=True, always=True) + def set_port_based_on_protocol(cls, port, values): + if port is None: + return 445 if values.get("protocol") == "SMB" else 139 + return port + + @slot + def check(self): + log.info("|%s| Checking connection availability...", self.__class__.__name__) + self._log_parameters() + try: + available_shares = {share.name for share in self.client.listShares()} + if self.share in available_shares: + log.info("|%s| Connection is available.", self.__class__.__name__) + else: + raise ConnectionError("Failed to connect to the Samba server.") + except (RuntimeError, ValueError): + # left validation errors intact + log.exception("|%s| Connection is unavailable", self.__class__.__name__) + raise + except Exception as exc: + log.exception("|%s| Connection is unavailable", self.__class__.__name__) + raise RuntimeError("Connection is unavailable") from exc + + return self + + @slot + def path_exists(self, path: os.PathLike | str) -> bool: + try: + self.client.getAttributes(self.share, str(path)) + return True + except OperationFailure: + return False + + def _scan_entries(self, path: RemotePath) -> list: + return self.client.listPath( + self.share, + str(path), + ) # pysmb do .replace('/', '\\'), doesn't work with type + + def _get_client(self) -> SMBConnection: + is_direct_tcp = self.protocol == "SMB" + use_ntlm_v2 = self.auth_type == "NTLMv2" + conn = SMBConnection( + username=self.user, + password=self.password.get_secret_value() if self.password else None, + my_name="optional_client_name", + remote_name=self.host, + domain=self.domain, + use_ntlm_v2=use_ntlm_v2, + sign_options=2, + is_direct_tcp=is_direct_tcp, + ) + conn.connect(self.host, port=self.port) + return conn # noqa: WPS331 + + def _is_client_closed(self, client: SMBConnection) -> bool: + try: + client.listShares() + except NotConnectedError: + return True + return False + + def _close_client(self, client: SMBConnection) -> None: + self.client.close() + + def _create_dir(self, path: RemotePath) -> None: + self.client.createDirectory( + self.share, + str(path), + ) # pysmb do .replace('/', '\\'), doesn't work with type + + def _upload_file(self, local_file_path: LocalPath, remote_file_path: str | RemotePath) -> None: + with open(local_file_path, "rb") as file_obj: + self.client.storeFile( + self.share, + str(remote_file_path), + file_obj, + ) # pysmb do .replace('/', '\\'), works with type + + def _rename_file(self, source: RemotePath, target: RemotePath) -> None: + # Implement your logic here + pass + + def _download_file(self, remote_file_path: RemotePath, local_file_path: LocalPath) -> None: + pass + + def _remove_dir(self, path: RemotePath) -> None: + # Implement your logic here + pass + + def _remove_file(self, remote_file_path: RemotePath) -> None: + # Implement your logic here + pass + + def _is_dir(self, path: RemotePath) -> bool: + return self.client.getAttributes(self.share, (os.fspath(path))).isDirectory + + def _is_file(self, path: RemotePath) -> bool: + return not self.client.getAttributes(self.share, (os.fspath(path))).isDirectory + + def _get_stat(self, path: RemotePath) -> RemotePathStat: + info = self.client.getAttributes(self.share, (os.fspath(path))) + + if self.is_dir(os.fspath(path)): + return RemotePathStat(st_mode=stat.S_IFDIR) + + return RemotePathStat( + st_size=info.file_size, + st_mtime=info.last_write_time, + st_uid=info.filename, + ) + + def _read_text(self, path: RemotePath, encoding: str, **kwargs) -> str: + file_obj = BytesIO() + self.client.retrieveFile( + self.share, + str(path), + file_obj, + ) # pysmb do .replace('/', '\\'), works with type + file_obj.seek(0) + return file_obj.read().decode(encoding) + + def _read_bytes(self, path: RemotePath, **kwargs) -> bytes: + # Implement your logic here + pass + + def _write_text(self, path: RemotePath, content: str, encoding: str, **kwargs) -> None: + # Implement your logic here + pass + + def _extract_name_from_entry(self, entry) -> str: + pass + + def _extract_stat_from_entry(self, top: RemotePath, entry) -> RemotePathStat: + pass + + def _is_dir_entry(self, top: RemotePath, entry) -> bool: + pass + + def _is_file_entry(self, top: RemotePath, entry) -> bool: + pass + + def _write_bytes(self, path: RemotePath, content: bytes, **kwargs) -> None: + # Implement your logic here + pass diff --git a/requirements/samba.txt b/requirements/samba.txt new file mode 100644 index 000000000..619ee4f51 --- /dev/null +++ b/requirements/samba.txt @@ -0,0 +1 @@ +pysmb diff --git a/requirements/tests/samba.txt b/requirements/tests/samba.txt new file mode 100644 index 000000000..619ee4f51 --- /dev/null +++ b/requirements/tests/samba.txt @@ -0,0 +1 @@ +pysmb diff --git a/setup.py b/setup.py index 422085535..f8b560707 100644 --- a/setup.py +++ b/setup.py @@ -33,6 +33,7 @@ def parse_requirements(file: Path) -> list[str]: requirements_ftp = parse_requirements(here / "requirements" / "ftp.txt") requirements_sftp = parse_requirements(here / "requirements" / "sftp.txt") +requirements_samba = parse_requirements(here / "requirements" / "samba.txt") requirements_hdfs = parse_requirements(here / "requirements" / "hdfs.txt") requirements_s3 = parse_requirements(here / "requirements" / "s3.txt") requirements_webdav = parse_requirements(here / "requirements" / "webdav.txt") @@ -90,6 +91,7 @@ def parse_requirements(file: Path) -> list[str]: "ftp": requirements_ftp, "ftps": requirements_ftp, "sftp": requirements_sftp, + "samba": requirements_samba, "hdfs": requirements_hdfs, "s3": requirements_s3, "webdav": requirements_webdav, diff --git a/tests/fixtures/connections/file_connections.py b/tests/fixtures/connections/file_connections.py index e8ef7253e..5c1950a67 100644 --- a/tests/fixtures/connections/file_connections.py +++ b/tests/fixtures/connections/file_connections.py @@ -12,6 +12,7 @@ lazy_fixture("hdfs_file_connection"), lazy_fixture("s3_file_connection"), lazy_fixture("sftp_file_connection"), + lazy_fixture("samba_file_connection"), lazy_fixture("webdav_file_connection"), ], ) diff --git a/tests/fixtures/connections/samba.py b/tests/fixtures/connections/samba.py new file mode 100644 index 000000000..52a294d5b --- /dev/null +++ b/tests/fixtures/connections/samba.py @@ -0,0 +1,63 @@ +import os +from collections import namedtuple +from pathlib import PurePosixPath + +import pytest + +from tests.util.upload_files import upload_files + + +@pytest.fixture( + scope="session", + params=[ + pytest.param("real-samba", marks=[pytest.mark.samba, pytest.mark.file_connection, pytest.mark.connection]), + ], +) +def samba_server(): + SambaServer = namedtuple("SambaServer", ["host", "protocol", "port", "share", "user", "password"]) + + return SambaServer( + host=os.getenv("ONETL_SAMBA_HOST"), + protocol=os.getenv("ONETL_SAMBA_PROTOCOL"), + port=os.getenv("ONETL_SAMBA_PORT"), + share=os.getenv("ONETL_SAMBA_SHARE"), + user=os.getenv("ONETL_SAMBA_USER"), + password=os.getenv("ONETL_SAMBA_PASSWORD"), + ) + + +@pytest.fixture() +def samba_file_connection(samba_server): + from onetl.connection import Samba + + return Samba( + host=samba_server.host, + protocol=samba_server.protocol, + port=samba_server.port, + share=samba_server.share, + user=samba_server.user, + password=samba_server.password, + ) + + +@pytest.fixture() +def samba_file_connection_with_path(request, samba_file_connection): + connection = samba_file_connection + root = PurePosixPath("/data") + + def finalizer(): + connection.remove_dir(root, recursive=True) + + request.addfinalizer(finalizer) + + connection.remove_dir(root, recursive=True) + + return connection, root + + +@pytest.fixture() +def samba_file_connection_with_path_and_files(resource_path, samba_file_connection_with_path): + connection, upload_to = samba_file_connection_with_path + upload_from = resource_path / "file_connection" + files = upload_files(upload_from, upload_to, connection) + return connection, upload_to, files diff --git a/tests/tests_integration/tests_file_connection_integration/test_samba_file_connection_integration.py b/tests/tests_integration/tests_file_connection_integration/test_samba_file_connection_integration.py new file mode 100644 index 000000000..453d8e4c2 --- /dev/null +++ b/tests/tests_integration/tests_file_connection_integration/test_samba_file_connection_integration.py @@ -0,0 +1,39 @@ +import logging + +import pytest + +pytestmark = [pytest.mark.samba, pytest.mark.file_connection, pytest.mark.connection] + + +def test_samba_file_connection_check_success(samba_file_connection, caplog): + samba = samba_file_connection + with caplog.at_level(logging.INFO): + assert samba.check() == samba + + assert "|Samba|" in caplog.text + assert f"host = '{samba.host}'" in caplog.text + assert f"port = {samba.port}" in caplog.text + assert f"protocol = '{samba.protocol}'" in caplog.text + assert f"user = '{samba.user}'" in caplog.text + assert f"share = '{samba.share}'" in caplog.text + assert "timeout = 10" in caplog.text + assert "password = SecretStr('**********')" in caplog.text + assert samba.password.get_secret_value() not in caplog.text + + assert "Connection is available." in caplog.text + + +def test_samba_file_connection_check_failed(samba_server): + from onetl.connection import Samba + + samba = Samba( + host=samba_server.host, + share=samba_server.share, + protocol=samba_server.protocol, + port=samba_server.port, + user="unknown", + password="unknown", + ) + + with pytest.raises(RuntimeError, match="Connection is unavailable"): + samba.check() From 4674ee4fe6c25c12e10c3a489fac962036e696db Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Mon, 18 Sep 2023 18:15:55 +0300 Subject: [PATCH 02/16] [DOP-8837] - implement all abstract methods for Samba connection --- onetl/connection/file_connection/samba.py | 170 ++++++++++++------ .../fixtures/connections/file_connections.py | 2 + .../test_file_downloader_integration.py | 9 +- .../test_file_uploader_integration.py | 5 +- 4 files changed, 126 insertions(+), 60 deletions(-) diff --git a/onetl/connection/file_connection/samba.py b/onetl/connection/file_connection/samba.py index 585153e27..8576f9aaf 100644 --- a/onetl/connection/file_connection/samba.py +++ b/onetl/connection/file_connection/samba.py @@ -60,20 +60,20 @@ class Samba(FileConnection): Parameters ---------- host : str - Host of Samba source. For example: ``msk.mts.ru``. This is a required field. + Host of Samba source. For example: ``msk.mts.ru``. share : str - The name of the share on the Samba server. This is a required field. + The name of the share on the Samba server. protocol : str, default: ``SMB`` The protocol to use for the connection. Either ``SMB`` or ``NetBIOS``. Affects the default port and the `is_direct_tcp` flag in `SMBConnection`. - port : int, default: None - Port of Samba source. Can be overridden. + port : int, default: 445 + Port of Samba source. domain : str, default: `` - Domain name for the Samba connection. Defaults to the same as `host`. + Domain name for the Samba connection. auth_type : str, default: ``NTLMv2`` The authentication type to use. Either ``NTLMv2`` or ``NTLMv1``. @@ -138,10 +138,35 @@ def path_exists(self, path: os.PathLike | str) -> bool: return False def _scan_entries(self, path: RemotePath) -> list: - return self.client.listPath( - self.share, - str(path), - ) # pysmb do .replace('/', '\\'), doesn't work with type + if self._is_dir(path): + return [ + entry + for entry in self.client.listPath( + self.share, + str(path), + ) + if entry.filename not in {".", ".."} # Filter out '.' and '..' + ] # pysmb do .replace('/', '\\'), doesn't work with type + return [self.client.getAttributes(self.share, (os.fspath(path)))] + + def _extract_name_from_entry(self, entry) -> str: + return entry.filename + + def _is_dir_entry(self, top: RemotePath, entry) -> bool: + return entry.isDirectory + + def _is_file_entry(self, top: RemotePath, entry) -> bool: + return not entry.isDirectory + + def _extract_stat_from_entry(self, top: RemotePath, entry) -> RemotePathStat: + if entry.isDirectory: + return RemotePathStat(st_mode=stat.S_IFDIR) + + return RemotePathStat( + st_size=entry.file_size, + st_mtime=entry.last_write_time, + st_uid=entry.filename, + ) def _get_client(self) -> SMBConnection: is_direct_tcp = self.protocol == "SMB" @@ -169,13 +194,45 @@ def _is_client_closed(self, client: SMBConnection) -> bool: def _close_client(self, client: SMBConnection) -> None: self.client.close() - def _create_dir(self, path: RemotePath) -> None: - self.client.createDirectory( + def _download_file(self, remote_file_path: RemotePath, local_file_path: LocalPath) -> None: + with open(local_file_path, "wb") as local_file: + self.client.retrieveFile( + self.share, + str(remote_file_path), + local_file, + show_progress=True, + ) # pysmb do .replace('/', '\\'), doesn't work with type + + def _get_stat(self, path: RemotePath) -> RemotePathStat: + info = self.client.getAttributes(self.share, (os.fspath(path))) + + if self.is_dir(os.fspath(path)): + return RemotePathStat(st_mode=stat.S_IFDIR) + + return RemotePathStat( + st_size=info.file_size, + st_mtime=info.last_write_time, + st_uid=info.filename, + ) + + def _remove_file(self, remote_file_path: RemotePath) -> None: + self.client.deleteFiles( self.share, - str(path), + str(remote_file_path), ) # pysmb do .replace('/', '\\'), doesn't work with type - def _upload_file(self, local_file_path: LocalPath, remote_file_path: str | RemotePath) -> None: + def _create_dir(self, path: RemotePath) -> None: + path_parts = str(path).strip("/").split("/") + current_path_parts = [] + for part in path_parts: # create dirs sequentially as .createDirectory(...) cannot create nested dirs + current_path_parts.append(part) + current_path = "/".join(current_path_parts) + try: + self.client.createDirectory(self.share, current_path) + except OperationFailure: + pass + + def _upload_file(self, local_file_path: LocalPath, remote_file_path: RemotePath) -> None: with open(local_file_path, "rb") as file_obj: self.client.storeFile( self.share, @@ -184,37 +241,26 @@ def _upload_file(self, local_file_path: LocalPath, remote_file_path: str | Remot ) # pysmb do .replace('/', '\\'), works with type def _rename_file(self, source: RemotePath, target: RemotePath) -> None: - # Implement your logic here - pass - - def _download_file(self, remote_file_path: RemotePath, local_file_path: LocalPath) -> None: - pass + self.client.rename( + self.share, + str(source), + str(target), + ) # pysmb do .replace('/', '\\'), doesn't work with type def _remove_dir(self, path: RemotePath) -> None: - # Implement your logic here - pass + files = self.client.listPath(self.share, str(path)) - def _remove_file(self, remote_file_path: RemotePath) -> None: - # Implement your logic here - pass - - def _is_dir(self, path: RemotePath) -> bool: - return self.client.getAttributes(self.share, (os.fspath(path))).isDirectory - - def _is_file(self, path: RemotePath) -> bool: - return not self.client.getAttributes(self.share, (os.fspath(path))).isDirectory + for f in files: + if f.filename not in {".", ".."}: # Skip current and parent directory entries + full_path = f"{path}/{f.filename}" + if f.isDirectory: + # Recursively delete subdirectory + self._remove_dir(full_path) + else: + # Delete file + self.client.deleteFiles(self.share, full_path) - def _get_stat(self, path: RemotePath) -> RemotePathStat: - info = self.client.getAttributes(self.share, (os.fspath(path))) - - if self.is_dir(os.fspath(path)): - return RemotePathStat(st_mode=stat.S_IFDIR) - - return RemotePathStat( - st_size=info.file_size, - st_mtime=info.last_write_time, - st_uid=info.filename, - ) + self.client.deleteDirectory(self.share, str(path)) def _read_text(self, path: RemotePath, encoding: str, **kwargs) -> str: file_obj = BytesIO() @@ -226,26 +272,36 @@ def _read_text(self, path: RemotePath, encoding: str, **kwargs) -> str: file_obj.seek(0) return file_obj.read().decode(encoding) - def _read_bytes(self, path: RemotePath, **kwargs) -> bytes: - # Implement your logic here - pass + def _read_bytes(self, path: RemotePath) -> bytes: + file_obj = BytesIO() + self.client.retrieveFile( + self.share, + str(path), + file_obj, + ) # pysmb replaces '/', works with type + file_obj.seek(0) + return file_obj.read() - def _write_text(self, path: RemotePath, content: str, encoding: str, **kwargs) -> None: - # Implement your logic here - pass + def _write_text(self, path: RemotePath, content: str, encoding: str) -> None: + file_obj = BytesIO(content.encode(encoding)) - def _extract_name_from_entry(self, entry) -> str: - pass + self.client.storeFile( + self.share, + str(path), + file_obj, + ) # pysmb replaces '/', works with type - def _extract_stat_from_entry(self, top: RemotePath, entry) -> RemotePathStat: - pass + def _write_bytes(self, path: RemotePath, content: bytes) -> None: + file_obj = BytesIO(content) - def _is_dir_entry(self, top: RemotePath, entry) -> bool: - pass + self.client.storeFile( + self.share, + str(path), + file_obj, + ) # pysmb replaces '/', works with type - def _is_file_entry(self, top: RemotePath, entry) -> bool: - pass + def _is_dir(self, path: RemotePath) -> bool: + return self.client.getAttributes(self.share, (os.fspath(path))).isDirectory - def _write_bytes(self, path: RemotePath, content: bytes, **kwargs) -> None: - # Implement your logic here - pass + def _is_file(self, path: RemotePath) -> bool: + return not self.client.getAttributes(self.share, (os.fspath(path))).isDirectory diff --git a/tests/fixtures/connections/file_connections.py b/tests/fixtures/connections/file_connections.py index 5c1950a67..f44240894 100644 --- a/tests/fixtures/connections/file_connections.py +++ b/tests/fixtures/connections/file_connections.py @@ -27,6 +27,7 @@ def file_connection(request): lazy_fixture("hdfs_file_connection_with_path"), lazy_fixture("s3_file_connection_with_path"), lazy_fixture("sftp_file_connection_with_path"), + lazy_fixture("samba_file_connection_with_path"), lazy_fixture("webdav_file_connection_with_path"), ], ) @@ -41,6 +42,7 @@ def file_connection_with_path(request): lazy_fixture("hdfs_file_connection_with_path_and_files"), lazy_fixture("s3_file_connection_with_path_and_files"), lazy_fixture("sftp_file_connection_with_path_and_files"), + lazy_fixture("samba_file_connection_with_path_and_files"), lazy_fixture("webdav_file_connection_with_path_and_files"), ], ) diff --git a/tests/tests_integration/tests_core_integration/test_file_downloader_integration.py b/tests/tests_integration/tests_core_integration/test_file_downloader_integration.py index ed290ab43..6a79c8972 100644 --- a/tests/tests_integration/tests_core_integration/test_file_downloader_integration.py +++ b/tests/tests_integration/tests_core_integration/test_file_downloader_integration.py @@ -661,7 +661,9 @@ def test_file_downloader_mode_replace_entire_directory( assert download_result.successful # folder contains only downloaded files - assert sorted(item for item in local_path.glob("**/*") if item.is_file()) == sorted(download_result.successful) + assert sorted(item.resolve() for item in local_path.glob("**/*") if item.is_file()) == sorted( + path.resolve() for path in download_result.successful + ) assert not temp_file.exists() @@ -755,7 +757,10 @@ def finalizer(): local_path=file.name, ) - with pytest.raises(NotADirectoryError, match=rf"'{file.name}' \(kind='file', .*\) is not a directory"): + with pytest.raises( + NotADirectoryError, + match=rf"'(/private)?{file.name}' \(kind='file', .*\) is not a directory", + ): downloader.run() diff --git a/tests/tests_integration/tests_core_integration/test_file_uploader_integration.py b/tests/tests_integration/tests_core_integration/test_file_uploader_integration.py index 522cf2dd4..364f34ed1 100644 --- a/tests/tests_integration/tests_core_integration/test_file_uploader_integration.py +++ b/tests/tests_integration/tests_core_integration/test_file_uploader_integration.py @@ -490,7 +490,10 @@ def test_file_uploader_run_local_path_not_a_directory(file_connection): with tempfile.NamedTemporaryFile() as file: uploader = FileUploader(connection=file_connection, target_path=target_path, local_path=file.name) - with pytest.raises(NotADirectoryError, match=rf"'{file.name}' \(kind='file', .*\) is not a directory"): + with pytest.raises( + NotADirectoryError, + match=rf"'(/private)?{file.name}' \(kind='file', .*\) is not a directory", + ): # On macOS, /var is a symlink to /private/var uploader.run() From 7a914f7f8e43ffa63606bcc58ff456e7f3ffdfcf Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Mon, 18 Sep 2023 18:23:51 +0300 Subject: [PATCH 03/16] [DOP-8837] - updated comments --- onetl/connection/file_connection/samba.py | 25 +++++++++++------------ 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/onetl/connection/file_connection/samba.py b/onetl/connection/file_connection/samba.py index 8576f9aaf..51c0c76e4 100644 --- a/onetl/connection/file_connection/samba.py +++ b/onetl/connection/file_connection/samba.py @@ -53,7 +53,7 @@ @support_hooks class Samba(FileConnection): - """Samba file connection. |support_hooks| + """Samba file connection. Based on `pysmb library `_. @@ -146,7 +146,7 @@ def _scan_entries(self, path: RemotePath) -> list: str(path), ) if entry.filename not in {".", ".."} # Filter out '.' and '..' - ] # pysmb do .replace('/', '\\'), doesn't work with type + ] # pysmb replaces '/', not works with type return [self.client.getAttributes(self.share, (os.fspath(path)))] def _extract_name_from_entry(self, entry) -> str: @@ -201,7 +201,7 @@ def _download_file(self, remote_file_path: RemotePath, local_file_path: LocalPat str(remote_file_path), local_file, show_progress=True, - ) # pysmb do .replace('/', '\\'), doesn't work with type + ) # pysmb replaces '/', not works with type def _get_stat(self, path: RemotePath) -> RemotePathStat: info = self.client.getAttributes(self.share, (os.fspath(path))) @@ -219,7 +219,7 @@ def _remove_file(self, remote_file_path: RemotePath) -> None: self.client.deleteFiles( self.share, str(remote_file_path), - ) # pysmb do .replace('/', '\\'), doesn't work with type + ) # pysmb replaces '/', not works with type def _create_dir(self, path: RemotePath) -> None: path_parts = str(path).strip("/").split("/") @@ -238,26 +238,25 @@ def _upload_file(self, local_file_path: LocalPath, remote_file_path: RemotePath) self.share, str(remote_file_path), file_obj, - ) # pysmb do .replace('/', '\\'), works with type + ) # pysmb replaces '/', not works with type def _rename_file(self, source: RemotePath, target: RemotePath) -> None: self.client.rename( self.share, str(source), str(target), - ) # pysmb do .replace('/', '\\'), doesn't work with type + ) # pysmb replaces '/', not works with type def _remove_dir(self, path: RemotePath) -> None: files = self.client.listPath(self.share, str(path)) for f in files: - if f.filename not in {".", ".."}: # Skip current and parent directory entries + if f.filename not in {".", ".."}: # skip current and parent directory entries full_path = f"{path}/{f.filename}" if f.isDirectory: - # Recursively delete subdirectory + # recursively delete subdirectory self._remove_dir(full_path) else: - # Delete file self.client.deleteFiles(self.share, full_path) self.client.deleteDirectory(self.share, str(path)) @@ -268,7 +267,7 @@ def _read_text(self, path: RemotePath, encoding: str, **kwargs) -> str: self.share, str(path), file_obj, - ) # pysmb do .replace('/', '\\'), works with type + ) # pysmb replaces '/', not works with type file_obj.seek(0) return file_obj.read().decode(encoding) @@ -278,7 +277,7 @@ def _read_bytes(self, path: RemotePath) -> bytes: self.share, str(path), file_obj, - ) # pysmb replaces '/', works with type + ) # pysmb replaces '/', not works with type file_obj.seek(0) return file_obj.read() @@ -289,7 +288,7 @@ def _write_text(self, path: RemotePath, content: str, encoding: str) -> None: self.share, str(path), file_obj, - ) # pysmb replaces '/', works with type + ) # pysmb replaces '/', not works with type def _write_bytes(self, path: RemotePath, content: bytes) -> None: file_obj = BytesIO(content) @@ -298,7 +297,7 @@ def _write_bytes(self, path: RemotePath, content: bytes) -> None: self.share, str(path), file_obj, - ) # pysmb replaces '/', works with type + ) # pysmb replaces '/', not works with type def _is_dir(self, path: RemotePath) -> bool: return self.client.getAttributes(self.share, (os.fspath(path))).isDirectory From b60f918af3c4e053b87b1748293d6fd3989e3f3f Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Mon, 18 Sep 2023 19:00:02 +0300 Subject: [PATCH 04/16] [DOP-8837] - add unit tests --- .../test_samba_unit.py | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 tests/tests_unit/tests_file_connection_unit/test_samba_unit.py diff --git a/tests/tests_unit/tests_file_connection_unit/test_samba_unit.py b/tests/tests_unit/tests_file_connection_unit/test_samba_unit.py new file mode 100644 index 000000000..5bf51c4fc --- /dev/null +++ b/tests/tests_unit/tests_file_connection_unit/test_samba_unit.py @@ -0,0 +1,48 @@ +import pytest + +from onetl.connection import FileConnection + +pytestmark = [pytest.mark.samba, pytest.mark.file_connection, pytest.mark.connection] + + +def test_samba_connection(): + from onetl.connection import Samba + + samba = Samba(host="some_host", share="share_name", user="some_user", password="pwd") + assert isinstance(samba, FileConnection) + assert samba.host == "some_host" + assert samba.protocol == "SMB" + assert samba.domain == "" + assert samba.auth_type == "NTLMv2" + assert samba.timeout == 10 + assert samba.port == 445 + assert samba.user == "some_user" + assert samba.password != "pwd" + assert samba.password.get_secret_value() == "pwd" + + assert "password='pwd'" not in str(samba) + assert "password='pwd'" not in repr(samba) + + +def test_samba_connection_with_net_bios(): + from onetl.connection import Samba + + samba = Samba(host="some_host", share="share_name", user="some_user", password="pwd", protocol="NetBIOS") + assert samba.protocol == "NetBIOS" + assert samba.port == 139 + + +@pytest.mark.parametrize("protocol", ["SMB", "NetBIOS"]) +def test_samba_connection_with_custom_port(protocol): + from onetl.connection import Samba + + samba = Samba(host="some_host", share="share_name", user="some_user", password="pwd", protocol=protocol, port=444) + assert samba.protocol == protocol + assert samba.port == 444 + + +def test_samba_connection_without_mandatory_args(): + from onetl.connection import Samba + + with pytest.raises(ValueError): + Samba() From ed9fc971895b4f9d61740fea232773310528bddb Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Mon, 18 Sep 2023 19:15:08 +0300 Subject: [PATCH 05/16] [DOP-8837] - update pytest.ini --- pytest.ini | 1 + 1 file changed, 1 insertion(+) diff --git a/pytest.ini b/pytest.ini index 5e40e75d7..3c71e8eb6 100644 --- a/pytest.ini +++ b/pytest.ini @@ -24,5 +24,6 @@ markers = postgres: Postgres tests s3: S3 tests sftp: SFTP tests + samba: Samba tests teradata: Teradata tests webdav: WebDAV tests From 0766476ad6131ea38faebf661a1ea67fcf85b4b9 Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Tue, 19 Sep 2023 11:36:41 +0300 Subject: [PATCH 06/16] [DOP-8837] - update docker-compose.yml --- .env.dependencies | 8 -------- .env.local | 2 ++ docker-compose.yml | 6 +++--- docker/samba/.keep | 0 4 files changed, 5 insertions(+), 11 deletions(-) create mode 100644 docker/samba/.keep diff --git a/.env.dependencies b/.env.dependencies index 35f6b03ba..af892898b 100644 --- a/.env.dependencies +++ b/.env.dependencies @@ -62,14 +62,6 @@ PASSWORD_ACCESS=true SUDO_ACCESS=true USER_PASSWORD=AesujeifohgoaCu0Boosiet5aimeitho -# Samba -ONETL_SAMBA_HOST=localhost -ONETL_SAMBA_PROTOCOL=SMB -ONETL_SAMBA_SHARE=445 -ONETL_SAMBA_SHARE=SmbShare -ONETL_SAMBA_USER=onetl -ONETL_SAMBA_PASSWORD=awd123fd1 - # WebDAV, FTP, FTPS APP_USER_NAME=onetl APP_USER_PASSWD=awd123fd1 diff --git a/.env.local b/.env.local index 874004b59..a41fc2efd 100644 --- a/.env.local +++ b/.env.local @@ -90,6 +90,8 @@ export ONETL_SFTP_PASSWORD=AesujeifohgoaCu0Boosiet5aimeitho # Samba export ONETL_SAMBA_HOST=localhost export ONETL_SAMBA_PROTOCOL=SMB +export ONETL_SAMBA_UID=1000 +export ONETL_SAMBA_GID=1000 export ONETL_SAMBA_SHARE=445 export ONETL_SAMBA_SHARE=SmbShare export ONETL_SAMBA_USER=onetl diff --git a/docker-compose.yml b/docker-compose.yml index 4439e6319..d19b09133 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -166,13 +166,13 @@ services: samba: image: elswork/samba + restart: unless-stopped ports: - "139:139" - "445:445" - restart: unless-stopped volumes: - - /path/to/share:/share/folder - command: '-u "1000:1000:onetl:d:awd123fd1" -s "SmbShare:/share/folder:rw:onetl"' + - ./docker/samba:/share/folder + command: '-u "${ONETL_SAMBA_UID}:${ONETL_SAMBA_GID}:${ONETL_SAMBA_USER}:${ONETL_SAMBA_USER}:${ONETL_SAMBA_PASSWORD}" -s "${ONETL_SAMBA_SHARE}:/share/folder:rw:${ONETL_SAMBA_USER}"' networks: - onetl diff --git a/docker/samba/.keep b/docker/samba/.keep new file mode 100644 index 000000000..e69de29bb From 612919539e49bab2f43933f5e2b2675dc1fe87a7 Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Tue, 19 Sep 2023 12:18:04 +0300 Subject: [PATCH 07/16] [DOP-8837] - add .sh script to allow dirs/files creation --- docker/Dockerfile | 1 + docker/samba/.keep | 0 docker/samba/on_post_init.sh | 4 ++++ 3 files changed, 5 insertions(+) delete mode 100644 docker/samba/.keep create mode 100755 docker/samba/on_post_init.sh diff --git a/docker/Dockerfile b/docker/Dockerfile index 103cc2b26..817d4eab2 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -57,6 +57,7 @@ RUN pip install \ -r /app/requirements/hdfs.txt \ -r /app/requirements/s3.txt \ -r /app/requirements/sftp.txt \ + -r /app/requirements/samba.txt \ -r /app/requirements/webdav.txt \ -r /app/requirements/kerberos.txt \ -r /app/requirements/docs.txt \ diff --git a/docker/samba/.keep b/docker/samba/.keep deleted file mode 100644 index e69de29bb..000000000 diff --git a/docker/samba/on_post_init.sh b/docker/samba/on_post_init.sh new file mode 100755 index 000000000..f71af2a03 --- /dev/null +++ b/docker/samba/on_post_init.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash + +# allow create files and directories +chmod 777 /share/folder From f9e4b06384b94e7786495301998b5ad1d3391681 Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Tue, 19 Sep 2023 12:30:56 +0300 Subject: [PATCH 08/16] [DOP-8837] - remove extra .env --- .env.docker | 8 -------- onetl/connection/file_connection/samba.py | 2 +- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/.env.docker b/.env.docker index 55fffbf9e..b9c2105aa 100644 --- a/.env.docker +++ b/.env.docker @@ -87,14 +87,6 @@ ONETL_SFTP_PORT=2222 ONETL_SFTP_USER=onetl ONETL_SFTP_PASSWORD=AesujeifohgoaCu0Boosiet5aimeitho -# Samba -ONETL_SAMBA_HOST=localhost -ONETL_SAMBA_PROTOCOL=SMB -ONETL_SAMBA_SHARE=445 -ONETL_SAMBA_SHARE=SmbShare -ONETL_SAMBA_USER=onetl -ONETL_SAMBA_PASSWORD=awd123fd1 - # Webdav ONETL_WEBDAV_HOST=webdav ONETL_WEBDAV_PORT=80 diff --git a/onetl/connection/file_connection/samba.py b/onetl/connection/file_connection/samba.py index 51c0c76e4..d6274b184 100644 --- a/onetl/connection/file_connection/samba.py +++ b/onetl/connection/file_connection/samba.py @@ -182,7 +182,7 @@ def _get_client(self) -> SMBConnection: is_direct_tcp=is_direct_tcp, ) conn.connect(self.host, port=self.port) - return conn # noqa: WPS331 + return conn def _is_client_closed(self, client: SMBConnection) -> bool: try: From 9faefa276abbc9438ce6add1fb06399acf02886e Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Tue, 19 Sep 2023 16:42:40 +0300 Subject: [PATCH 09/16] [DOP-8837] - modified Samba connection methods --- .env.local | 2 +- onetl/connection/file_connection/samba.py | 134 ++++++++---------- .../test_file_downloader_integration.py | 12 +- .../test_file_uploader_integration.py | 5 +- .../test_samba_file_connection_integration.py | 23 ++- .../test_samba_unit.py | 1 - 6 files changed, 93 insertions(+), 84 deletions(-) diff --git a/.env.local b/.env.local index a41fc2efd..2e05030f3 100644 --- a/.env.local +++ b/.env.local @@ -92,7 +92,7 @@ export ONETL_SAMBA_HOST=localhost export ONETL_SAMBA_PROTOCOL=SMB export ONETL_SAMBA_UID=1000 export ONETL_SAMBA_GID=1000 -export ONETL_SAMBA_SHARE=445 +export ONETL_SAMBA_PORT=445 export ONETL_SAMBA_SHARE=SmbShare export ONETL_SAMBA_USER=onetl export ONETL_SAMBA_PASSWORD=awd123fd1 diff --git a/onetl/connection/file_connection/samba.py b/onetl/connection/file_connection/samba.py index d6274b184..61532e577 100644 --- a/onetl/connection/file_connection/samba.py +++ b/onetl/connection/file_connection/samba.py @@ -19,7 +19,8 @@ import textwrap from io import BytesIO from logging import getLogger -from typing import Optional +from pathlib import Path +from typing import Literal, Optional, Union from etl_entities.instance import Host from pydantic import SecretStr, validator @@ -29,7 +30,6 @@ from onetl.impl import LocalPath, RemotePath, RemotePathStat try: - from smb.base import NotConnectedError from smb.smb_structs import OperationFailure from smb.SMBConnection import SMBConnection except (ImportError, NameError) as e: @@ -38,7 +38,7 @@ """ Cannot import module "pysmb". - Since onETL v0.7.0 you should install package as follows: + You should install package as follows: pip install onetl[samba] or @@ -60,7 +60,7 @@ class Samba(FileConnection): Parameters ---------- host : str - Host of Samba source. For example: ``msk.mts.ru``. + Host of Samba source. For example: ``mydomain.com``. share : str The name of the share on the Samba server. @@ -73,7 +73,7 @@ class Samba(FileConnection): Port of Samba source. domain : str, default: `` - Domain name for the Samba connection. + Domain name for the Samba connection. Empty strings means use ``host`` as domain name. auth_type : str, default: ``NTLMv2`` The authentication type to use. Either ``NTLMv2`` or ``NTLMv1``. @@ -85,30 +85,21 @@ class Samba(FileConnection): password : str, default: None Password for file source connection. Can be `None` for anonymous connection. - timeout : int, default: ``10`` - How long to wait for the server to send data before giving up. """ host: Host share: str - protocol: str = "SMB" + protocol: Union[Literal["SMB"], Literal["NetBIOS"]] = "SMB" port: Optional[int] = None domain: Optional[str] = "" - auth_type: str = "NTLMv2" + auth_type: Union[Literal["NTLMv1"], Literal["NTLMv2"]] = "NTLMv2" user: Optional[str] = None password: Optional[SecretStr] = None - timeout: int = 10 @property def instance_url(self) -> str: return f"smb://{self.host}:{self.port}" - @validator("port", pre=True, always=True) - def set_port_based_on_protocol(cls, port, values): - if port is None: - return 445 if values.get("protocol") == "SMB" else 139 - return port - @slot def check(self): log.info("|%s| Checking connection availability...", self.__class__.__name__) @@ -118,11 +109,13 @@ def check(self): if self.share in available_shares: log.info("|%s| Connection is available.", self.__class__.__name__) else: + log.error( + "|%s| Share: %s not found among existing shares: %s", + self.__class__.__name__, + self.share, + available_shares, + ) raise ConnectionError("Failed to connect to the Samba server.") - except (RuntimeError, ValueError): - # left validation errors intact - log.exception("|%s| Connection is unavailable", self.__class__.__name__) - raise except Exception as exc: log.exception("|%s| Connection is unavailable", self.__class__.__name__) raise RuntimeError("Connection is unavailable") from exc @@ -143,11 +136,11 @@ def _scan_entries(self, path: RemotePath) -> list: entry for entry in self.client.listPath( self.share, - str(path), + os.fspath(path), ) if entry.filename not in {".", ".."} # Filter out '.' and '..' - ] # pysmb replaces '/', not works with type - return [self.client.getAttributes(self.share, (os.fspath(path)))] + ] + return [self.client.getAttributes(self.share, os.fspath(path))] def _extract_name_from_entry(self, entry) -> str: return entry.filename @@ -185,11 +178,14 @@ def _get_client(self) -> SMBConnection: return conn def _is_client_closed(self, client: SMBConnection) -> bool: + if client is None: + return True try: - client.listShares() - except NotConnectedError: + socket_fileno = client.sock.fileno() + except (AttributeError, OSError): return True - return False + + return socket_fileno == -1 def _close_client(self, client: SMBConnection) -> None: self.client.close() @@ -198,13 +194,12 @@ def _download_file(self, remote_file_path: RemotePath, local_file_path: LocalPat with open(local_file_path, "wb") as local_file: self.client.retrieveFile( self.share, - str(remote_file_path), + os.fspath(remote_file_path), local_file, - show_progress=True, - ) # pysmb replaces '/', not works with type + ) def _get_stat(self, path: RemotePath) -> RemotePathStat: - info = self.client.getAttributes(self.share, (os.fspath(path))) + info = self.client.getAttributes(self.share, os.fspath(path)) if self.is_dir(os.fspath(path)): return RemotePathStat(st_mode=stat.S_IFDIR) @@ -218,89 +213,84 @@ def _get_stat(self, path: RemotePath) -> RemotePathStat: def _remove_file(self, remote_file_path: RemotePath) -> None: self.client.deleteFiles( self.share, - str(remote_file_path), - ) # pysmb replaces '/', not works with type + os.fspath(remote_file_path), + ) def _create_dir(self, path: RemotePath) -> None: - path_parts = str(path).strip("/").split("/") - current_path_parts = [] - for part in path_parts: # create dirs sequentially as .createDirectory(...) cannot create nested dirs - current_path_parts.append(part) - current_path = "/".join(current_path_parts) + path_obj = Path(path) + for parent in reversed(path_obj.parents): + # create dirs sequentially as .createDirectory(...) cannot create nested dirs try: - self.client.createDirectory(self.share, current_path) + self.client.createDirectory(self.share, os.fspath(parent)) except OperationFailure: pass + try: + self.client.createDirectory(self.share, os.fspath(path)) + except OperationFailure: + pass def _upload_file(self, local_file_path: LocalPath, remote_file_path: RemotePath) -> None: with open(local_file_path, "rb") as file_obj: self.client.storeFile( self.share, - str(remote_file_path), + os.fspath(remote_file_path), file_obj, - ) # pysmb replaces '/', not works with type + ) def _rename_file(self, source: RemotePath, target: RemotePath) -> None: self.client.rename( self.share, - str(source), - str(target), - ) # pysmb replaces '/', not works with type + os.fspath(source), + os.fspath(target), + ) def _remove_dir(self, path: RemotePath) -> None: - files = self.client.listPath(self.share, str(path)) + files = self.client.listPath(self.share, os.fspath(path)) - for f in files: - if f.filename not in {".", ".."}: # skip current and parent directory entries - full_path = f"{path}/{f.filename}" - if f.isDirectory: + for item in files: + if item.filename not in {".", ".."}: # skip current and parent directory entries + full_path = path / item.filename + if item.isDirectory: # recursively delete subdirectory self._remove_dir(full_path) else: - self.client.deleteFiles(self.share, full_path) + self.client.deleteFiles(self.share, os.fspath(full_path)) - self.client.deleteDirectory(self.share, str(path)) + self.client.deleteDirectory(self.share, os.fspath(path)) - def _read_text(self, path: RemotePath, encoding: str, **kwargs) -> str: - file_obj = BytesIO() - self.client.retrieveFile( - self.share, - str(path), - file_obj, - ) # pysmb replaces '/', not works with type - file_obj.seek(0) - return file_obj.read().decode(encoding) + def _read_text(self, path: RemotePath, encoding: str) -> str: + return self._read_bytes(path).decode(encoding) def _read_bytes(self, path: RemotePath) -> bytes: file_obj = BytesIO() self.client.retrieveFile( self.share, - str(path), + os.fspath(path), file_obj, - ) # pysmb replaces '/', not works with type + ) file_obj.seek(0) return file_obj.read() def _write_text(self, path: RemotePath, content: str, encoding: str) -> None: - file_obj = BytesIO(content.encode(encoding)) - - self.client.storeFile( - self.share, - str(path), - file_obj, - ) # pysmb replaces '/', not works with type + self._write_bytes(path, bytes(content, encoding)) def _write_bytes(self, path: RemotePath, content: bytes) -> None: file_obj = BytesIO(content) self.client.storeFile( self.share, - str(path), + os.fspath(path), file_obj, - ) # pysmb replaces '/', not works with type + ) def _is_dir(self, path: RemotePath) -> bool: - return self.client.getAttributes(self.share, (os.fspath(path))).isDirectory + return self.client.getAttributes(self.share, os.fspath(path)).isDirectory def _is_file(self, path: RemotePath) -> bool: - return not self.client.getAttributes(self.share, (os.fspath(path))).isDirectory + return not self.client.getAttributes(self.share, os.fspath(path)).isDirectory + + @validator("port", pre=True, always=True) + def _set_port_based_on_protocol(cls, port, values): + if port is None: + return 445 if values.get("protocol") == "SMB" else 139 + return port diff --git a/tests/tests_integration/tests_core_integration/test_file_downloader_integration.py b/tests/tests_integration/tests_core_integration/test_file_downloader_integration.py index 6a79c8972..0a932dd46 100644 --- a/tests/tests_integration/tests_core_integration/test_file_downloader_integration.py +++ b/tests/tests_integration/tests_core_integration/test_file_downloader_integration.py @@ -635,10 +635,11 @@ def test_file_downloader_mode_replace_entire_directory( caplog, ): file_connection, remote_path, _ = file_connection_with_path_and_files + # Reason for using .resolve(): https://stackoverflow.com/a/58719476 if local_dir_exist: - local_path = tmp_path_factory.mktemp("local_path") + local_path = tmp_path_factory.mktemp("local_path").resolve() else: - local_path = Path(tempfile.gettempdir()) / secrets.token_hex() + local_path = Path(tempfile.gettempdir()).resolve() / secrets.token_hex() temp_file = local_path / secrets.token_hex(5) if local_dir_exist: @@ -661,9 +662,7 @@ def test_file_downloader_mode_replace_entire_directory( assert download_result.successful # folder contains only downloaded files - assert sorted(item.resolve() for item in local_path.glob("**/*") if item.is_file()) == sorted( - path.resolve() for path in download_result.successful - ) + assert sorted(item for item in local_path.glob("**/*") if item.is_file()) == sorted(download_result.successful) assert not temp_file.exists() @@ -757,9 +756,10 @@ def finalizer(): local_path=file.name, ) + # Reason for .realpath(): https://stackoverflow.com/a/58719476 with pytest.raises( NotADirectoryError, - match=rf"'(/private)?{file.name}' \(kind='file', .*\) is not a directory", + match=rf"'{os.path.realpath(file.name)}' \(kind='file', .*\) is not a directory", ): downloader.run() diff --git a/tests/tests_integration/tests_core_integration/test_file_uploader_integration.py b/tests/tests_integration/tests_core_integration/test_file_uploader_integration.py index 364f34ed1..feedeaa45 100644 --- a/tests/tests_integration/tests_core_integration/test_file_uploader_integration.py +++ b/tests/tests_integration/tests_core_integration/test_file_uploader_integration.py @@ -490,10 +490,11 @@ def test_file_uploader_run_local_path_not_a_directory(file_connection): with tempfile.NamedTemporaryFile() as file: uploader = FileUploader(connection=file_connection, target_path=target_path, local_path=file.name) + # Reason for .realpath(): https://stackoverflow.com/a/58719476 with pytest.raises( NotADirectoryError, - match=rf"'(/private)?{file.name}' \(kind='file', .*\) is not a directory", - ): # On macOS, /var is a symlink to /private/var + match=rf"'{os.path.realpath(file.name)}' \(kind='file', .*\) is not a directory", + ): uploader.run() diff --git a/tests/tests_integration/tests_file_connection_integration/test_samba_file_connection_integration.py b/tests/tests_integration/tests_file_connection_integration/test_samba_file_connection_integration.py index 453d8e4c2..3b6385c7e 100644 --- a/tests/tests_integration/tests_file_connection_integration/test_samba_file_connection_integration.py +++ b/tests/tests_integration/tests_file_connection_integration/test_samba_file_connection_integration.py @@ -16,14 +16,33 @@ def test_samba_file_connection_check_success(samba_file_connection, caplog): assert f"protocol = '{samba.protocol}'" in caplog.text assert f"user = '{samba.user}'" in caplog.text assert f"share = '{samba.share}'" in caplog.text - assert "timeout = 10" in caplog.text assert "password = SecretStr('**********')" in caplog.text assert samba.password.get_secret_value() not in caplog.text assert "Connection is available." in caplog.text -def test_samba_file_connection_check_failed(samba_server): +def test_samba_file_connection_check_not_existing_share_failed(samba_server, caplog): + from onetl.connection import Samba + + not_existing_share = "NotExistingShare" + samba = Samba( + host=samba_server.host, + share=not_existing_share, + protocol=samba_server.protocol, + port=samba_server.port, + user=samba_server.user, + password=samba_server.password, + ) + + with caplog.at_level(logging.INFO): + with pytest.raises(RuntimeError, match="Connection is unavailable"): + samba.check() + + assert f"Share: {not_existing_share} not found among existing shares" in caplog.text + + +def test_samba_file_connection_check_runtime_failed(samba_server): from onetl.connection import Samba samba = Samba( diff --git a/tests/tests_unit/tests_file_connection_unit/test_samba_unit.py b/tests/tests_unit/tests_file_connection_unit/test_samba_unit.py index 5bf51c4fc..42f95b368 100644 --- a/tests/tests_unit/tests_file_connection_unit/test_samba_unit.py +++ b/tests/tests_unit/tests_file_connection_unit/test_samba_unit.py @@ -14,7 +14,6 @@ def test_samba_connection(): assert samba.protocol == "SMB" assert samba.domain == "" assert samba.auth_type == "NTLMv2" - assert samba.timeout == 10 assert samba.port == 445 assert samba.user == "some_user" assert samba.password != "pwd" From 5792e9a97ce4217a5fc95878d54e57dd5fc9f466 Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Tue, 19 Sep 2023 16:44:16 +0300 Subject: [PATCH 10/16] [DOP-8837] - update Samba connection method --- onetl/connection/file_connection/samba.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/onetl/connection/file_connection/samba.py b/onetl/connection/file_connection/samba.py index 61532e577..cbe180ebe 100644 --- a/onetl/connection/file_connection/samba.py +++ b/onetl/connection/file_connection/samba.py @@ -125,7 +125,7 @@ def check(self): @slot def path_exists(self, path: os.PathLike | str) -> bool: try: - self.client.getAttributes(self.share, str(path)) + self.client.getAttributes(self.share, os.fspath(path)) return True except OperationFailure: return False From 60120dd9b7fc7a7b1311398091528e75b4975cc8 Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Tue, 19 Sep 2023 17:07:18 +0300 Subject: [PATCH 11/16] [DOP-8837] - update Samba connection method --- onetl/connection/file_connection/samba.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/onetl/connection/file_connection/samba.py b/onetl/connection/file_connection/samba.py index cbe180ebe..a6b0daf8f 100644 --- a/onetl/connection/file_connection/samba.py +++ b/onetl/connection/file_connection/samba.py @@ -178,8 +178,6 @@ def _get_client(self) -> SMBConnection: return conn def _is_client_closed(self, client: SMBConnection) -> bool: - if client is None: - return True try: socket_fileno = client.sock.fileno() except (AttributeError, OSError): From 731f6dbadb9b47830fa17602c0498b9be41e583a Mon Sep 17 00:00:00 2001 From: Maxim Liksakov <67663774+maxim-lixakov@users.noreply.github.com> Date: Tue, 19 Sep 2023 17:01:38 +0300 Subject: [PATCH 12/16] Update onetl/connection/file_connection/samba.py Co-authored-by: Maxim Martynov --- onetl/connection/file_connection/samba.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/onetl/connection/file_connection/samba.py b/onetl/connection/file_connection/samba.py index a6b0daf8f..1b9715427 100644 --- a/onetl/connection/file_connection/samba.py +++ b/onetl/connection/file_connection/samba.py @@ -110,7 +110,7 @@ def check(self): log.info("|%s| Connection is available.", self.__class__.__name__) else: log.error( - "|%s| Share: %s not found among existing shares: %s", + "|%s| Share %r not found among existing shares %r", self.__class__.__name__, self.share, available_shares, From 4328b0cd946b86643dcc8e0257e1750113be877f Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Tue, 19 Sep 2023 17:23:18 +0300 Subject: [PATCH 13/16] [DOP-8837] - update Samba connection method --- onetl/connection/file_connection/samba.py | 6 ++---- .../test_samba_file_connection_integration.py | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/onetl/connection/file_connection/samba.py b/onetl/connection/file_connection/samba.py index 1b9715427..300cc578e 100644 --- a/onetl/connection/file_connection/samba.py +++ b/onetl/connection/file_connection/samba.py @@ -222,10 +222,8 @@ def _create_dir(self, path: RemotePath) -> None: self.client.createDirectory(self.share, os.fspath(parent)) except OperationFailure: pass - try: - self.client.createDirectory(self.share, os.fspath(path)) - except OperationFailure: - pass + + self.client.createDirectory(self.share, os.fspath(path)) def _upload_file(self, local_file_path: LocalPath, remote_file_path: RemotePath) -> None: with open(local_file_path, "rb") as file_obj: diff --git a/tests/tests_integration/tests_file_connection_integration/test_samba_file_connection_integration.py b/tests/tests_integration/tests_file_connection_integration/test_samba_file_connection_integration.py index 3b6385c7e..62381c5a8 100644 --- a/tests/tests_integration/tests_file_connection_integration/test_samba_file_connection_integration.py +++ b/tests/tests_integration/tests_file_connection_integration/test_samba_file_connection_integration.py @@ -39,7 +39,7 @@ def test_samba_file_connection_check_not_existing_share_failed(samba_server, cap with pytest.raises(RuntimeError, match="Connection is unavailable"): samba.check() - assert f"Share: {not_existing_share} not found among existing shares" in caplog.text + assert f"Share {not_existing_share} not found among existing shares" in caplog.text def test_samba_file_connection_check_runtime_failed(samba_server): From 83fd1f5b3889b80d8f53e39fc89160ee0d025e40 Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Tue, 19 Sep 2023 23:12:36 +0300 Subject: [PATCH 14/16] [DOP-8837] - update _create_dir method in Samba connection --- onetl/connection/file_connection/samba.py | 4 ++-- .../test_samba_file_connection_integration.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/onetl/connection/file_connection/samba.py b/onetl/connection/file_connection/samba.py index 300cc578e..7a7f21132 100644 --- a/onetl/connection/file_connection/samba.py +++ b/onetl/connection/file_connection/samba.py @@ -219,9 +219,9 @@ def _create_dir(self, path: RemotePath) -> None: for parent in reversed(path_obj.parents): # create dirs sequentially as .createDirectory(...) cannot create nested dirs try: - self.client.createDirectory(self.share, os.fspath(parent)) + self.client.getAttributes(self.share, os.fspath(parent)) except OperationFailure: - pass + self.client.createDirectory(self.share, os.fspath(parent)) self.client.createDirectory(self.share, os.fspath(path)) diff --git a/tests/tests_integration/tests_file_connection_integration/test_samba_file_connection_integration.py b/tests/tests_integration/tests_file_connection_integration/test_samba_file_connection_integration.py index 62381c5a8..7c5c8f5d5 100644 --- a/tests/tests_integration/tests_file_connection_integration/test_samba_file_connection_integration.py +++ b/tests/tests_integration/tests_file_connection_integration/test_samba_file_connection_integration.py @@ -39,7 +39,7 @@ def test_samba_file_connection_check_not_existing_share_failed(samba_server, cap with pytest.raises(RuntimeError, match="Connection is unavailable"): samba.check() - assert f"Share {not_existing_share} not found among existing shares" in caplog.text + assert f"Share '{not_existing_share}' not found among existing shares" in caplog.text def test_samba_file_connection_check_runtime_failed(samba_server): From 6db6b9496a026f0fef92c711953231cb29a1be81 Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Wed, 20 Sep 2023 13:19:14 +0300 Subject: [PATCH 15/16] [DOP-8837] - hardcode env vars in docker compose configuration --- .env.docker | 10 ++++++++++ docker-compose.yml | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/.env.docker b/.env.docker index b9c2105aa..807740725 100644 --- a/.env.docker +++ b/.env.docker @@ -87,6 +87,16 @@ ONETL_SFTP_PORT=2222 ONETL_SFTP_USER=onetl ONETL_SFTP_PASSWORD=AesujeifohgoaCu0Boosiet5aimeitho +# Samba +ONETL_SAMBA_HOST=localhost +ONETL_SAMBA_PROTOCOL=SMB +ONETL_SAMBA_UID=1000 +ONETL_SAMBA_GID=1000 +ONETL_SAMBA_PORT=445 +ONETL_SAMBA_SHARE=SmbShare +ONETL_SAMBA_USER=onetl +ONETL_SAMBA_PASSWORD=awd123fd1 + # Webdav ONETL_WEBDAV_HOST=webdav ONETL_WEBDAV_PORT=80 diff --git a/docker-compose.yml b/docker-compose.yml index d19b09133..3d93c02af 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -172,7 +172,7 @@ services: - "445:445" volumes: - ./docker/samba:/share/folder - command: '-u "${ONETL_SAMBA_UID}:${ONETL_SAMBA_GID}:${ONETL_SAMBA_USER}:${ONETL_SAMBA_USER}:${ONETL_SAMBA_PASSWORD}" -s "${ONETL_SAMBA_SHARE}:/share/folder:rw:${ONETL_SAMBA_USER}"' + command: '-u "1000:1000:onetl:onetl:awd123fd1" -s "SmbShare:/share/folder:rw:onetl"' networks: - onetl From 51edbc16b41311de9788e1308a2f92c85ba8509f Mon Sep 17 00:00:00 2001 From: Maxim Liksakov <67663774+maxim-lixakov@users.noreply.github.com> Date: Wed, 20 Sep 2023 13:31:15 +0300 Subject: [PATCH 16/16] Update .env.docker Co-authored-by: Maxim Martynov --- .env.docker | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.env.docker b/.env.docker index 807740725..cb0394806 100644 --- a/.env.docker +++ b/.env.docker @@ -88,7 +88,7 @@ ONETL_SFTP_USER=onetl ONETL_SFTP_PASSWORD=AesujeifohgoaCu0Boosiet5aimeitho # Samba -ONETL_SAMBA_HOST=localhost +ONETL_SAMBA_HOST=samba ONETL_SAMBA_PROTOCOL=SMB ONETL_SAMBA_UID=1000 ONETL_SAMBA_GID=1000