Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOP-11904] - add has_data, raise_if_no_data methods in DBReader #203

Merged
merged 24 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
71cf999
[DOP-11752] - add support of incremental strategy in Kafka
maxim-lixakov Jan 23, 2024
b37780d
[DOP-11752] - update tests
maxim-lixakov Jan 24, 2024
8da10bc
[DOP-11752] - fix the behavior with changing partitions number in inc…
maxim-lixakov Jan 25, 2024
2b4fdba
[DOP-11752] - update tests
maxim-lixakov Jan 26, 2024
56c83d1
[DOP-11752] - fix incremental_with_new_partition test
maxim-lixakov Jan 26, 2024
dbe169c
[DOP-11904] - add has_data, raise_if_no_data methods in DBReader
maxim-lixakov Jan 29, 2024
b38bb9b
[DOP-11904] - update fixtures for Kafka tests
maxim-lixakov Jan 30, 2024
51dab70
[DOP-11904] - customize Oracle and MSSQL get_sql_query to handle limit
maxim-lixakov Jan 30, 2024
6515214
[DOP-11904] - customize Oracle and MSSQL get_sql_query, add unit tests
maxim-lixakov Jan 30, 2024
69cf0b5
[DOP-11904] - minor bug fix in fetching_hwm type
maxim-lixakov Jan 30, 2024
949891d
[DOP-11904] - add docs for using .has_data inside contextlib
maxim-lixakov Jan 31, 2024
79b64b6
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 31, 2024
e8c3d33
[DOP-11904] - add documentation and tests for using has_data out of c…
maxim-lixakov Jan 31, 2024
3e53e63
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 31, 2024
922187a
[DOP-11904] - update has_data docstrings
maxim-lixakov Jan 31, 2024
ae96f7f
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 31, 2024
ad045a1
[DOP-11904] - update has_data docstrings
maxim-lixakov Jan 31, 2024
b9c677d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 31, 2024
c9b9495
[DOP-11904] - update docstrings
maxim-lixakov Jan 31, 2024
7b714ed
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 31, 2024
6a59169
Update onetl/db/db_reader/db_reader.py
maxim-lixakov Jan 31, 2024
37c0a47
[DOP-11904] - update docstrings
maxim-lixakov Jan 31, 2024
d1bc4df
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 31, 2024
edd80ce
[DOP-11904] - update docstrings
maxim-lixakov Jan 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,9 @@ Read data from MSSQL, transform & write to Hive.
options=MSSQL.ReadOptions(fetchsize=10000),
)

# checks that there is data in the table, otherwise raises exception
reader.raise_if_no_data()

# Read data to DataFrame
df = reader.run()
df.printSchema()
Expand Down
1 change: 1 addition & 0 deletions docs/changelog/next_release/203.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add ``has_data``, ``raise_if_no_data`` methods to ``DBReader`` class.
4 changes: 3 additions & 1 deletion docs/db/db_reader.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ DB Reader

DBReader
DBReader.run
maxim-lixakov marked this conversation as resolved.
Show resolved Hide resolved
DBReader.has_data
DBReader.raise_if_no_data

.. autoclass:: DBReader
:members: run
:members: run, has_data, raise_if_no_data
23 changes: 23 additions & 0 deletions onetl/connection/db_connection/mssql/dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,29 @@ def get_partition_column_hash(self, partition_column: str, num_partitions: int)
def get_partition_column_mod(self, partition_column: str, num_partitions: int) -> str:
return f"{partition_column} % {num_partitions}"

def get_sql_query(
self,
table: str,
columns: list[str] | None = None,
where: str | list[str] | None = None,
hint: str | None = None,
limit: int | None = None,
compact: bool = False,
) -> str:
query = super().get_sql_query(
table=table,
columns=columns,
where=where,
hint=hint,
limit=0 if limit == 0 else None,
compact=compact,
)
# MSSQL-specific handling for the LIMIT clause using TOP
if limit is not None and limit > 0:
query = query.replace("SELECT", f"SELECT TOP {limit}", 1)

return query

def _serialize_datetime(self, value: datetime) -> str:
result = value.isoformat()
return f"CAST('{result}' AS datetime2)"
Expand Down
14 changes: 13 additions & 1 deletion onetl/connection/db_connection/oracle/dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,24 @@ def get_sql_query(
new_columns = columns or ["*"]
if len(new_columns) > 1:
new_columns = [table + ".*" if column.strip() == "*" else column for column in new_columns]

where = where or []
if isinstance(where, str):
where = [where]

if limit is not None:
if limit == 0:
where = ["1=0"]
else:
# Oracle does not support LIMIT
where.append(f"ROWNUM <= {limit}")

return super().get_sql_query(
table=table,
columns=new_columns,
where=where,
hint=hint,
limit=limit,
limit=None,
compact=compact,
)

Expand Down
110 changes: 106 additions & 4 deletions onetl/db/db_reader/db_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
ContainsGetDFSchemaMethod,
ContainsGetMinMaxValues,
)
from onetl.exception import NoDataError
from onetl.hooks import slot, support_hooks
from onetl.hwm import AutoDetectHWM, Edge, Window
from onetl.impl import FrozenModel, GenericOptions
Expand Down Expand Up @@ -501,6 +502,97 @@ def validate_options(cls, options, values):

return None

@slot
def has_data(self) -> bool:
maxim-lixakov marked this conversation as resolved.
Show resolved Hide resolved
"""Returns ``True`` if there is some data in the source, ``False`` otherwise. |support_hooks|

.. note::

This method can return different results depending on :ref:`strategy`

.. warning::

If :etl-entities:`hwm <hwm/index.html>` is used, then method should be called inside :ref:`strategy` context. And vise-versa, if HWM is not used, this method should not be called within strategy.

Raises
------
RuntimeError

Current strategy is not compatible with HWM parameter.

Examples
--------

.. code:: python

reader = DBReader(...)

# handle situation when there is no data in the source
if reader.has_data():
df = reader.run()
else:
# implement your handling logic here
...
"""
self._check_strategy()

if not self._connection_checked:
self._log_parameters()
self.connection.check()

window, limit = self._calculate_window_and_limit()
if limit == 0:
return False

df = self.connection.read_source_as_df(
source=str(self.source),
columns=self.columns,
hint=self.hint,
where=self.where,
df_schema=self.df_schema,
window=window,
limit=1,
**self._get_read_kwargs(),
)

return bool(df.take(1))

@slot
def raise_if_no_data(self) -> None:
"""Raises exception ``NoDataError`` if source does not contain any data. |support_hooks|

.. note::

This method can return different results depending on :ref:`strategy`

.. warning::

If :etl-entities:`hwm <hwm/index.html>` is used, then method should be called inside :ref:`strategy` context. And vise-versa, if HWM is not used, this method should not be called within strategy.

Raises
------
RuntimeError

Current strategy is not compatible with HWM parameter.

:obj:`onetl.exception.NoDataError`

There is no data in source.

Examples
--------

.. code:: python

reader = DBReader(...)

# ensure that there is some data in the source before reading it using Spark
reader.raise_if_no_data()
"""

if not self.has_data():
raise NoDataError(f"No data in the source: {self.source}")

@slot
def run(self) -> DataFrame:
"""
Expand All @@ -510,6 +602,10 @@ def run(self) -> DataFrame:

This method can return different results depending on :ref:`strategy`

.. warning::

If :etl-entities:`hwm <hwm/index.html>` is used, then method should be called inside :ref:`strategy` context. And vise-versa, if HWM is not used, this method should not be called within strategy.

Returns
-------
df : pyspark.sql.dataframe.DataFrame
Expand Down Expand Up @@ -541,6 +637,12 @@ def run(self) -> DataFrame:
self._connection_checked = True

window, limit = self._calculate_window_and_limit()

# update the HWM with the stop value
if self.hwm and window:
strategy: HWMStrategy = StrategyManager.get_current() # type: ignore[assignment]
strategy.update_hwm(window.stop_at.value)

df = self.connection.read_source_as_df(
source=str(self.source),
columns=self.columns,
Expand All @@ -562,7 +664,9 @@ def _check_strategy(self):

if self.hwm:
if not isinstance(strategy, HWMStrategy):
raise RuntimeError(f"{class_name}(hwm=...) cannot be used with {strategy_name}")
raise RuntimeError(
f"{class_name}(hwm=...) cannot be used with {strategy_name}. Check documentation DBReader.has_data(): https://onetl.readthedocs.io/en/stable/db/db_reader.html#onetl.db.db_reader.db_reader.DBReader.has_data.",
)
self._prepare_hwm(strategy, self.hwm)

elif isinstance(strategy, HWMStrategy):
Expand All @@ -578,7 +682,7 @@ def _prepare_hwm(self, strategy: HWMStrategy, hwm: ColumnHWM):
strategy.fetch_hwm()
return

if not isinstance(strategy.hwm, ColumnHWM) or strategy.hwm.name != hwm.name:
if not isinstance(strategy.hwm, (ColumnHWM, KeyValueHWM)) or strategy.hwm.name != hwm.name:
# exception raised when inside one strategy >1 processes on the same table but with different hwm columns
# are executed, example: test_postgres_strategy_incremental_hwm_set_twice
error_message = textwrap.dedent(
Expand Down Expand Up @@ -673,7 +777,6 @@ def _calculate_window_and_limit(self) -> tuple[Window | None, int | None]: # no
if start_value is not None and stop_value is not None:
# we already have start and stop values, nothing to do
window = Window(self.hwm.expression, start_from=strategy.current, stop_at=strategy.next)
strategy.update_hwm(window.stop_at.value)
return window, None

if not isinstance(self.connection, ContainsGetMinMaxValues):
Expand Down Expand Up @@ -737,7 +840,6 @@ def _calculate_window_and_limit(self) -> tuple[Window | None, int | None]: # no
stop_at=Edge(value=max_value),
)

strategy.update_hwm(window.stop_at.value)
return window, None

def _log_parameters(self) -> None:
Expand Down
6 changes: 2 additions & 4 deletions onetl/hooks/hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ class Hook(Generic[T]): # noqa: WPS338
from onetl.hooks.hook import Hook, HookPriority


def some_func(*args, **kwargs):
...
def some_func(*args, **kwargs): ...


hook = Hook(callback=some_func, enabled=True, priority=HookPriority.FIRST)
Expand Down Expand Up @@ -209,8 +208,7 @@ def __call__(self, *args, **kwargs) -> T | ContextDecorator:
from onetl.hooks.hook import Hook, HookPriority


def some_func(*args, **kwargs):
...
def some_func(*args, **kwargs): ...


hook = Hook(callback=some_func)
Expand Down
42 changes: 14 additions & 28 deletions onetl/hooks/slot.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,31 +139,27 @@ def _prepare_hook_args(
@support_hooks
class MyClass:
@slot
def method(self, some, named="abc"):
...
def method(self, some, named="abc"): ...

then hook should have a compatible signature, like these ones:

.. code:: python

@MyClass.method.bind
@hook
def callback(self, some, named):
...
def callback(self, some, named): ...

.. code:: python

@MyClass.method.bind
@hook
def callback(self, some, **kwargs):
...
def callback(self, some, **kwargs): ...

.. code:: python

@MyClass.method.bind
@hook
def callback(my_class_instance, *args, **kwargs):
...
def callback(my_class_instance, *args, **kwargs): ...

.. note::

Expand Down Expand Up @@ -561,14 +557,12 @@ def suspend_hooks(self):
@support_hooks
class MyClass:
@slot
def my_method(self, arg):
...
def my_method(self, arg): ...


@MyClass.my_method.bind
@hook
def callback1(self, arg):
...
def callback1(self, arg): ...


obj = MyClass()
Expand Down Expand Up @@ -598,14 +592,12 @@ def resume_hooks(self):
@support_hooks
class MyClass:
@slot
def my_method(self, arg):
...
def my_method(self, arg): ...


@MyClass.my_method.bind
@hook
def callback1(self, arg):
...
def callback1(self, arg): ...


obj = MyClass()
Expand Down Expand Up @@ -656,36 +648,30 @@ def slot(method: Method) -> Method:
@support_hooks
class MyClass:
@slot
def my_method(self, arg):
...
def my_method(self, arg): ...

@slot # decorator should be on top of all other decorators
@classmethod
def class_method(cls):
...
def class_method(cls): ...

@slot # decorator should be on top of all other decorators
@staticmethod
def static_method(arg):
...
def static_method(arg): ...


@MyClass.my_method.bind
@hook
def callback1(self, arg):
...
def callback1(self, arg): ...


@MyClass.class_method.bind
@hook
def callback2(cls):
...
def callback2(cls): ...


@MyClass.static_method.bind
@hook
def callback3(arg):
...
def callback3(arg): ...


obj = MyClass()
Expand Down
Loading
Loading