Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOP-11904] - add has_data, raise_if_no_data methods in DBReader #203

Merged
merged 24 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
71cf999
[DOP-11752] - add support of incremental strategy in Kafka
maxim-lixakov Jan 23, 2024
b37780d
[DOP-11752] - update tests
maxim-lixakov Jan 24, 2024
8da10bc
[DOP-11752] - fix the behavior with changing partitions number in inc…
maxim-lixakov Jan 25, 2024
2b4fdba
[DOP-11752] - update tests
maxim-lixakov Jan 26, 2024
56c83d1
[DOP-11752] - fix incremental_with_new_partition test
maxim-lixakov Jan 26, 2024
dbe169c
[DOP-11904] - add has_data, raise_if_no_data methods in DBReader
maxim-lixakov Jan 29, 2024
b38bb9b
[DOP-11904] - update fixtures for Kafka tests
maxim-lixakov Jan 30, 2024
51dab70
[DOP-11904] - customize Oracle and MSSQL get_sql_query to handle limit
maxim-lixakov Jan 30, 2024
6515214
[DOP-11904] - customize Oracle and MSSQL get_sql_query, add unit tests
maxim-lixakov Jan 30, 2024
69cf0b5
[DOP-11904] - minor bug fix in fetching_hwm type
maxim-lixakov Jan 30, 2024
949891d
[DOP-11904] - add docs for using .has_data inside contextlib
maxim-lixakov Jan 31, 2024
79b64b6
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 31, 2024
e8c3d33
[DOP-11904] - add documentation and tests for using has_data out of c…
maxim-lixakov Jan 31, 2024
3e53e63
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 31, 2024
922187a
[DOP-11904] - update has_data docstrings
maxim-lixakov Jan 31, 2024
ae96f7f
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 31, 2024
ad045a1
[DOP-11904] - update has_data docstrings
maxim-lixakov Jan 31, 2024
b9c677d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 31, 2024
c9b9495
[DOP-11904] - update docstrings
maxim-lixakov Jan 31, 2024
7b714ed
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 31, 2024
6a59169
Update onetl/db/db_reader/db_reader.py
maxim-lixakov Jan 31, 2024
37c0a47
[DOP-11904] - update docstrings
maxim-lixakov Jan 31, 2024
d1bc4df
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 31, 2024
edd80ce
[DOP-11904] - update docstrings
maxim-lixakov Jan 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,9 @@ Read data from MSSQL, transform & write to Hive.
options=MSSQL.ReadOptions(fetchsize=10000),
)

# checks that there is data in the table, otherwise raises exception
reader.raise_if_no_data()

# Read data to DataFrame
df = reader.run()
df.printSchema()
Expand Down
1 change: 1 addition & 0 deletions docs/changelog/next_release/203.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add ``has_data``, ``raise_if_no_data`` methods to ``DBReader`` class.
4 changes: 3 additions & 1 deletion docs/db/db_reader.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ DB Reader

DBReader
DBReader.run
maxim-lixakov marked this conversation as resolved.
Show resolved Hide resolved
DBReader.has_data
DBReader.raise_if_no_data

.. autoclass:: DBReader
:members: run
:members: run, has_data, raise_if_no_data
23 changes: 23 additions & 0 deletions onetl/connection/db_connection/mssql/dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,29 @@ def get_partition_column_hash(self, partition_column: str, num_partitions: int)
def get_partition_column_mod(self, partition_column: str, num_partitions: int) -> str:
return f"{partition_column} % {num_partitions}"

def get_sql_query(
self,
table: str,
columns: list[str] | None = None,
where: str | list[str] | None = None,
hint: str | None = None,
limit: int | None = None,
compact: bool = False,
) -> str:
query = super().get_sql_query(
table=table,
columns=columns,
where=where,
hint=hint,
limit=0 if limit == 0 else None,
compact=compact,
)
# MSSQL-specific handling for the LIMIT clause using TOP
if limit is not None and limit > 0:
query = query.replace("SELECT", f"SELECT TOP {limit}", 1)

return query

def _serialize_datetime(self, value: datetime) -> str:
result = value.isoformat()
return f"CAST('{result}' AS datetime2)"
Expand Down
14 changes: 13 additions & 1 deletion onetl/connection/db_connection/oracle/dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,24 @@ def get_sql_query(
new_columns = columns or ["*"]
if len(new_columns) > 1:
new_columns = [table + ".*" if column.strip() == "*" else column for column in new_columns]

where = where or []
if isinstance(where, str):
where = [where]

if limit is not None:
if limit == 0:
where = ["1=0"]
else:
# Oracle does not support LIMIT
where.append(f"ROWNUM <= {limit}")

return super().get_sql_query(
table=table,
columns=new_columns,
where=where,
hint=hint,
limit=limit,
limit=None,
compact=compact,
)

Expand Down
103 changes: 99 additions & 4 deletions onetl/db/db_reader/db_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
ContainsGetDFSchemaMethod,
ContainsGetMinMaxValues,
)
from onetl.exception import NoDataError
from onetl.hooks import slot, support_hooks
from onetl.hwm import AutoDetectHWM, Edge, Window
from onetl.impl import FrozenModel, GenericOptions
Expand Down Expand Up @@ -501,6 +502,90 @@ def validate_options(cls, options, values):

return None

@slot
def has_data(self) -> bool:
maxim-lixakov marked this conversation as resolved.
Show resolved Hide resolved
"""Returns ``True`` if there is some data in the source, ``False`` otherwise.
maxim-lixakov marked this conversation as resolved.
Show resolved Hide resolved

.. warning::

If :etl-entities:`hwm <hwm/index.html>` is used, then method should be called inside :ref:`strategy` context. And vise-versa, if HWM is not used, this method should not be called within strategy.

Raises
------
RuntimeError

Current strategy is not compatible with HWM parameter.

Examples
--------

.. code:: python

reader = DBReader(...)

# handle situation when there is no data in the source
if reader.has_data():
df = reader.run()
else:
# implement your handling logic here
...
"""
self._check_strategy()

if not self._connection_checked:
self._log_parameters()
self.connection.check()

window, limit = self._calculate_window_and_limit()
if limit == 0:
return False

df = self.connection.read_source_as_df(
source=str(self.source),
columns=self.columns,
hint=self.hint,
where=self.where,
df_schema=self.df_schema,
window=window,
limit=1,
**self._get_read_kwargs(),
)

return bool(df.take(1))

@slot
def raise_if_no_data(self) -> None:
"""Raises exception ``NoDataError`` if source does not contain any data.

.. warning::

If :etl-entities:`hwm <hwm/index.html>` is used, then method should be called inside :ref:`strategy` context. And vise-versa, if HWM is not used, this method should not be called within strategy.

Raises
------
RuntimeError

Current strategy is not compatible with HWM parameter.

:obj:`onetl.exception.NoDataError`

There is no data in source.

Examples
--------

.. code:: python

reader = DBReader(...)

# before creating read SparkDF, ensure that there is some data in the source
reader.raise_if_no_data()
df = reader.run()
maxim-lixakov marked this conversation as resolved.
Show resolved Hide resolved
"""

if not self.has_data():
raise NoDataError(f"No data in the source: {self.source}")

@slot
def run(self) -> DataFrame:
"""
Expand All @@ -510,6 +595,10 @@ def run(self) -> DataFrame:

This method can return different results depending on :ref:`strategy`

.. warning::

If :etl-entities:`hwm <hwm/index.html>` is used, then method should be called inside :ref:`strategy` context. And vise-versa, if HWM is not used, this method should not be called within strategy.

Returns
-------
df : pyspark.sql.dataframe.DataFrame
Expand Down Expand Up @@ -541,6 +630,12 @@ def run(self) -> DataFrame:
self._connection_checked = True

window, limit = self._calculate_window_and_limit()

# update the HWM with the stop value
if self.hwm and window:
strategy: HWMStrategy = StrategyManager.get_current() # type: ignore[assignment]
strategy.update_hwm(window.stop_at.value)

df = self.connection.read_source_as_df(
source=str(self.source),
columns=self.columns,
Expand All @@ -562,7 +657,9 @@ def _check_strategy(self):

if self.hwm:
if not isinstance(strategy, HWMStrategy):
raise RuntimeError(f"{class_name}(hwm=...) cannot be used with {strategy_name}")
raise RuntimeError(
f"{class_name}(hwm=...) cannot be used with {strategy_name}. Check documentation DBReader.has_data(): https://onetl.readthedocs.io/en/stable/db/db_reader.html#onetl.db.db_reader.db_reader.DBReader.has_data.",
)
self._prepare_hwm(strategy, self.hwm)

elif isinstance(strategy, HWMStrategy):
Expand All @@ -578,7 +675,7 @@ def _prepare_hwm(self, strategy: HWMStrategy, hwm: ColumnHWM):
strategy.fetch_hwm()
return

if not isinstance(strategy.hwm, ColumnHWM) or strategy.hwm.name != hwm.name:
if not isinstance(strategy.hwm, (ColumnHWM, KeyValueHWM)) or strategy.hwm.name != hwm.name:
# exception raised when inside one strategy >1 processes on the same table but with different hwm columns
# are executed, example: test_postgres_strategy_incremental_hwm_set_twice
error_message = textwrap.dedent(
Expand Down Expand Up @@ -673,7 +770,6 @@ def _calculate_window_and_limit(self) -> tuple[Window | None, int | None]: # no
if start_value is not None and stop_value is not None:
# we already have start and stop values, nothing to do
window = Window(self.hwm.expression, start_from=strategy.current, stop_at=strategy.next)
strategy.update_hwm(window.stop_at.value)
return window, None

if not isinstance(self.connection, ContainsGetMinMaxValues):
Expand Down Expand Up @@ -737,7 +833,6 @@ def _calculate_window_and_limit(self) -> tuple[Window | None, int | None]: # no
stop_at=Edge(value=max_value),
)

strategy.update_hwm(window.stop_at.value)
return window, None

def _log_parameters(self) -> None:
Expand Down
32 changes: 32 additions & 0 deletions tests/fixtures/processing/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,35 @@ def load_table_data(prepare_schema_table, processing):
)

return prepare_schema_table


@pytest.fixture
def kafka_topic(processing, request):
topic = secrets.token_hex(6)
processing.create_topic(topic, num_partitions=1)

def delete_topic():
processing.delete_topic([topic])

request.addfinalizer(delete_topic)
return topic


@pytest.fixture
def kafka_dataframe_schema():
from pyspark.sql.types import (
FloatType,
LongType,
StringType,
StructField,
StructType,
)

return StructType(
[
StructField("id_int", LongType(), nullable=True),
StructField("text_string", StringType(), nullable=True),
StructField("hwm_int", LongType(), nullable=True),
StructField("float_value", FloatType(), nullable=True),
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,11 @@ def test_clickhouse_reader_snapshot_nothing_to_read(spark, processing, prepare_s
first_span = processing.create_pandas_df(min_id=first_span_begin, max_id=first_span_end)
second_span = processing.create_pandas_df(min_id=second_span_begin, max_id=second_span_end)

with pytest.raises(Exception, match="No data in the source:"):
reader.raise_if_no_data()

assert not reader.has_data()

# no data yet, nothing to read
df = reader.run()
assert not df.count()
Expand All @@ -352,8 +357,12 @@ def test_clickhouse_reader_snapshot_nothing_to_read(spark, processing, prepare_s
# .run() is not called, but dataframes are lazy, so it now contains all data from the source
processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int")

# check that read df has data
assert reader.has_data()

# read data explicitly
df = reader.run()

processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int")

# insert second span
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,11 @@ def test_greenplum_reader_snapshot_nothing_to_read(spark, processing, prepare_sc
first_span = processing.create_pandas_df(min_id=first_span_begin, max_id=first_span_end)
second_span = processing.create_pandas_df(min_id=second_span_begin, max_id=second_span_end)

with pytest.raises(Exception, match="No data in the source:"):
reader.raise_if_no_data()

assert not reader.has_data()

# no data yet, nothing to read
df = reader.run()
assert not df.count()
Expand All @@ -294,8 +299,12 @@ def test_greenplum_reader_snapshot_nothing_to_read(spark, processing, prepare_sc
# .run() is not called, but dataframes are lazy, so it now contains all data from the source
processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int")

# check that read df has data
assert reader.has_data()

# read data explicitly
df = reader.run()

processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int")

# insert second span
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,11 @@ def test_hive_reader_snapshot_nothing_to_read(spark, processing, prepare_schema_
first_span = processing.create_pandas_df(min_id=first_span_begin, max_id=first_span_end)
second_span = processing.create_pandas_df(min_id=second_span_begin, max_id=second_span_end)

with pytest.raises(Exception, match="No data in the source:"):
reader.raise_if_no_data()

assert not reader.has_data()

# no data yet, nothing to read
df = reader.run()
assert not df.count()
Expand All @@ -234,8 +239,12 @@ def test_hive_reader_snapshot_nothing_to_read(spark, processing, prepare_schema_
# .run() is not called, but dataframes are lazy, so it now contains all data from the source
processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int")

# check that read df has data
assert reader.has_data()

# read data explicitly
df = reader.run()

processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int")

# insert second span
Expand Down
Loading
Loading