diff --git a/docs/changelog/0.12.0.rst b/docs/changelog/0.12.0.rst index 4a09682a..34c9c18b 100644 --- a/docs/changelog/0.12.0.rst +++ b/docs/changelog/0.12.0.rst @@ -39,8 +39,8 @@ Features are not returned as method call result. (:github:pull:`303`) - Generate default ``jobDescription`` based on currently executed method. Examples: - * ``DBWriter[schema.table].run() -> Postgres[host:5432/database]`` - * ``MongoDB[localhost:27017/admin] -> DBReader[mycollection].run()`` + * ``DBWriter.run(schema.table) -> Postgres[host:5432/database]`` + * ``MongoDB[localhost:27017/admin] -> DBReader.has_data(mycollection)`` * ``Hive[cluster].execute()`` If user already set custom ``jobDescription``, it will left intact. (:github:pull:`304`) diff --git a/onetl/db/db_reader/db_reader.py b/onetl/db/db_reader/db_reader.py index 42f17db3..4ad3d236 100644 --- a/onetl/db/db_reader/db_reader.py +++ b/onetl/db/db_reader/db_reader.py @@ -542,7 +542,7 @@ def has_data(self) -> bool: """ self._check_strategy() - job_description = f"{self}.has_data()" + job_description = f"{self.__class__.__name__}.has_data({self.source})" with override_job_description(self.connection.spark, job_description): if not self._connection_checked: self._log_parameters() @@ -635,7 +635,7 @@ def run(self) -> DataFrame: self._check_strategy() - job_description = f"{self}.run() -> {self.connection}" + job_description = f"{self.__class__.__name__}.run({self.source}) -> {self.connection}" with override_job_description(self.connection.spark, job_description): if not self._connection_checked: self._log_parameters() @@ -663,9 +663,6 @@ def run(self) -> DataFrame: entity_boundary_log(log, msg=f"{self.__class__.__name__}.run() ends", char="-") return df - def __str__(self): - return f"{self.__class__.__name__}[{self.source}]" - def _check_strategy(self): strategy = StrategyManager.get_current() class_name = type(self).__name__ diff --git a/onetl/db/db_writer/db_writer.py b/onetl/db/db_writer/db_writer.py index 5206c07d..3bcf63aa 100644 --- a/onetl/db/db_writer/db_writer.py +++ b/onetl/db/db_writer/db_writer.py @@ -203,7 +203,7 @@ def run(self, df: DataFrame) -> None: entity_boundary_log(log, msg=f"{self.__class__.__name__}.run() starts") - job_description = f"{self}.run() -> {self.connection}" + job_description = f"{self.__class__.__name__}.run({self.target}) -> {self.connection}" with override_job_description(self.connection.spark, job_description): if not self._connection_checked: self._log_parameters() @@ -240,9 +240,6 @@ def run(self, df: DataFrame) -> None: entity_boundary_log(log, msg=f"{self.__class__.__name__}.run() ends", char="-") - def __str__(self): - return f"{self.__class__.__name__}[{self.target}]" - def _log_parameters(self) -> None: log.info("|Spark| -> |%s| Writing DataFrame to target using parameters:", self.connection.__class__.__name__) log_with_indent(log, "target = '%s'", self.target) diff --git a/onetl/file/file_df_reader/file_df_reader.py b/onetl/file/file_df_reader/file_df_reader.py index ed83bbc5..36aab796 100644 --- a/onetl/file/file_df_reader/file_df_reader.py +++ b/onetl/file/file_df_reader/file_df_reader.py @@ -211,7 +211,11 @@ def run(self, files: Iterable[str | os.PathLike] | None = None) -> DataFrame: if not self._connection_checked: self._log_parameters(files) - job_description = f"{self}.run() -> {self.connection}" + if files: + job_description = f"{self.__class__.__name__}.run([..files..]) -> {self.connection}" + else: + job_description = f"{self.__class__.__name__}.run({self.source_path}) -> {self.connection}" + with override_job_description(self.connection.spark, job_description): paths: FileSet[PurePathProtocol] = FileSet() if files is not None: @@ -229,11 +233,6 @@ def run(self, files: Iterable[str | os.PathLike] | None = None) -> DataFrame: entity_boundary_log(log, msg=f"{self.__class__.__name__}.run() ends", char="-") return df - def __str__(self): - if self.source_path: - return f"{self.__class__.__name__}[{os.fspath(self.source_path)}]" - return f"{self.__class__.__name__}" - def _read_files(self, paths: FileSet[PurePathProtocol]) -> DataFrame: log.info("|%s| Paths to be read:", self.__class__.__name__) log_lines(log, str(paths)) diff --git a/onetl/file/file_df_writer/file_df_writer.py b/onetl/file/file_df_writer/file_df_writer.py index aeda5f7b..037fc7ee 100644 --- a/onetl/file/file_df_writer/file_df_writer.py +++ b/onetl/file/file_df_writer/file_df_writer.py @@ -3,7 +3,6 @@ from __future__ import annotations import logging -import os from typing import TYPE_CHECKING try: @@ -125,7 +124,7 @@ def run(self, df: DataFrame) -> None: if df.isStreaming: raise ValueError(f"DataFrame is streaming. {self.__class__.__name__} supports only batch DataFrames.") - job_description = f"{self}).run() -> {self.connection}" + job_description = f"{self.__class__.__name__}.run({self.target_path}) -> {self.connection}" with override_job_description(self.connection.spark, job_description): if not self._connection_checked: self._log_parameters(df) @@ -162,9 +161,6 @@ def run(self, df: DataFrame) -> None: entity_boundary_log(log, f"{self.__class__.__name__}.run() ends", char="-") - def __str__(self): - return f"{self.__class__.__name__}[{os.fspath(self.target_path)}]" - def _log_parameters(self, df: DataFrame) -> None: log.info("|Spark| -> |%s| Writing dataframe using parameters:", self.connection.__class__.__name__) log_with_indent(log, "target_path = '%s'", self.target_path)