Skip to content

Commit

Permalink
[DOP-18743] Update jobDescription format
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed Sep 2, 2024
1 parent 61695dc commit 3d5bfe5
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 22 deletions.
4 changes: 2 additions & 2 deletions docs/changelog/0.12.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ Features
are not returned as method call result. (:github:pull:`303`)

- Generate default ``jobDescription`` based on currently executed method. Examples:
* ``DBWriter[schema.table].run() -> Postgres[host:5432/database]``
* ``MongoDB[localhost:27017/admin] -> DBReader[mycollection].run()``
* ``DBWriter.run(schema.table) -> Postgres[host:5432/database]``
* ``MongoDB[localhost:27017/admin] -> DBReader.has_data(mycollection)``
* ``Hive[cluster].execute()``

If user already set custom ``jobDescription``, it will left intact. (:github:pull:`304`)
Expand Down
7 changes: 2 additions & 5 deletions onetl/db/db_reader/db_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ def has_data(self) -> bool:
"""
self._check_strategy()

job_description = f"{self}.has_data()"
job_description = f"{self.__class__.__name__}.has_data({self.source})"
with override_job_description(self.connection.spark, job_description):
if not self._connection_checked:
self._log_parameters()
Expand Down Expand Up @@ -635,7 +635,7 @@ def run(self) -> DataFrame:

self._check_strategy()

job_description = f"{self}.run() -> {self.connection}"
job_description = f"{self.__class__.__name__}.run({self.source}) -> {self.connection}"
with override_job_description(self.connection.spark, job_description):
if not self._connection_checked:
self._log_parameters()
Expand Down Expand Up @@ -663,9 +663,6 @@ def run(self) -> DataFrame:
entity_boundary_log(log, msg=f"{self.__class__.__name__}.run() ends", char="-")
return df

def __str__(self):
return f"{self.__class__.__name__}[{self.source}]"

def _check_strategy(self):
strategy = StrategyManager.get_current()
class_name = type(self).__name__
Expand Down
5 changes: 1 addition & 4 deletions onetl/db/db_writer/db_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def run(self, df: DataFrame) -> None:

entity_boundary_log(log, msg=f"{self.__class__.__name__}.run() starts")

job_description = f"{self}.run() -> {self.connection}"
job_description = f"{self.__class__.__name__}.run({self.target}) -> {self.connection}"
with override_job_description(self.connection.spark, job_description):
if not self._connection_checked:
self._log_parameters()
Expand Down Expand Up @@ -240,9 +240,6 @@ def run(self, df: DataFrame) -> None:

entity_boundary_log(log, msg=f"{self.__class__.__name__}.run() ends", char="-")

def __str__(self):
return f"{self.__class__.__name__}[{self.target}]"

def _log_parameters(self) -> None:
log.info("|Spark| -> |%s| Writing DataFrame to target using parameters:", self.connection.__class__.__name__)
log_with_indent(log, "target = '%s'", self.target)
Expand Down
11 changes: 5 additions & 6 deletions onetl/file/file_df_reader/file_df_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,11 @@ def run(self, files: Iterable[str | os.PathLike] | None = None) -> DataFrame:
if not self._connection_checked:
self._log_parameters(files)

job_description = f"{self}.run() -> {self.connection}"
if files:
job_description = f"{self.__class__.__name__}.run([..files..]) -> {self.connection}"
else:
job_description = f"{self.__class__.__name__}.run({self.source_path}) -> {self.connection}"

with override_job_description(self.connection.spark, job_description):
paths: FileSet[PurePathProtocol] = FileSet()
if files is not None:
Expand All @@ -229,11 +233,6 @@ def run(self, files: Iterable[str | os.PathLike] | None = None) -> DataFrame:
entity_boundary_log(log, msg=f"{self.__class__.__name__}.run() ends", char="-")
return df

def __str__(self):
if self.source_path:
return f"{self.__class__.__name__}[{os.fspath(self.source_path)}]"
return f"{self.__class__.__name__}"

def _read_files(self, paths: FileSet[PurePathProtocol]) -> DataFrame:
log.info("|%s| Paths to be read:", self.__class__.__name__)
log_lines(log, str(paths))
Expand Down
6 changes: 1 addition & 5 deletions onetl/file/file_df_writer/file_df_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from __future__ import annotations

import logging
import os
from typing import TYPE_CHECKING

try:
Expand Down Expand Up @@ -125,7 +124,7 @@ def run(self, df: DataFrame) -> None:
if df.isStreaming:
raise ValueError(f"DataFrame is streaming. {self.__class__.__name__} supports only batch DataFrames.")

job_description = f"{self}).run() -> {self.connection}"
job_description = f"{self.__class__.__name__}.run({self.target_path}) -> {self.connection}"
with override_job_description(self.connection.spark, job_description):
if not self._connection_checked:
self._log_parameters(df)
Expand Down Expand Up @@ -162,9 +161,6 @@ def run(self, df: DataFrame) -> None:

entity_boundary_log(log, f"{self.__class__.__name__}.run() ends", char="-")

def __str__(self):
return f"{self.__class__.__name__}[{os.fspath(self.target_path)}]"

def _log_parameters(self, df: DataFrame) -> None:
log.info("|Spark| -> |%s| Writing dataframe using parameters:", self.connection.__class__.__name__)
log_with_indent(log, "target_path = '%s'", self.target_path)
Expand Down

0 comments on commit 3d5bfe5

Please sign in to comment.