diff --git a/README.rst b/README.rst index 01ff7a7f5..6199023e7 100644 --- a/README.rst +++ b/README.rst @@ -337,6 +337,9 @@ Read data from MSSQL, transform & write to Hive. options=MSSQL.ReadOptions(fetchsize=10000), ) + # checks that there is data in the table, otherwise raises exception + reader.raise_if_no_data() + # Read data to DataFrame df = reader.run() df.printSchema() diff --git a/docs/changelog/next_release/203.feature.rst b/docs/changelog/next_release/203.feature.rst new file mode 100644 index 000000000..304a02e33 --- /dev/null +++ b/docs/changelog/next_release/203.feature.rst @@ -0,0 +1 @@ +Add ``has_data``, ``raise_if_no_data`` methods to ``DBReader`` class. diff --git a/docs/db/db_reader.rst b/docs/db/db_reader.rst index 4e256d42c..3571be7bc 100644 --- a/docs/db/db_reader.rst +++ b/docs/db/db_reader.rst @@ -9,6 +9,8 @@ DB Reader DBReader DBReader.run + DBReader.has_data + DBReader.raise_if_no_data .. autoclass:: DBReader - :members: run + :members: run, has_data, raise_if_no_data diff --git a/onetl/connection/db_connection/mssql/dialect.py b/onetl/connection/db_connection/mssql/dialect.py index f39568423..c5cbe9e93 100644 --- a/onetl/connection/db_connection/mssql/dialect.py +++ b/onetl/connection/db_connection/mssql/dialect.py @@ -27,6 +27,29 @@ def get_partition_column_hash(self, partition_column: str, num_partitions: int) def get_partition_column_mod(self, partition_column: str, num_partitions: int) -> str: return f"{partition_column} % {num_partitions}" + def get_sql_query( + self, + table: str, + columns: list[str] | None = None, + where: str | list[str] | None = None, + hint: str | None = None, + limit: int | None = None, + compact: bool = False, + ) -> str: + query = super().get_sql_query( + table=table, + columns=columns, + where=where, + hint=hint, + limit=0 if limit == 0 else None, + compact=compact, + ) + # MSSQL-specific handling for the LIMIT clause using TOP + if limit is not None and limit > 0: + query = query.replace("SELECT", f"SELECT TOP {limit}", 1) + + return query + def _serialize_datetime(self, value: datetime) -> str: result = value.isoformat() return f"CAST('{result}' AS datetime2)" diff --git a/onetl/connection/db_connection/oracle/dialect.py b/onetl/connection/db_connection/oracle/dialect.py index 9484524fd..d8142842b 100644 --- a/onetl/connection/db_connection/oracle/dialect.py +++ b/onetl/connection/db_connection/oracle/dialect.py @@ -33,12 +33,24 @@ def get_sql_query( new_columns = columns or ["*"] if len(new_columns) > 1: new_columns = [table + ".*" if column.strip() == "*" else column for column in new_columns] + + where = where or [] + if isinstance(where, str): + where = [where] + + if limit is not None: + if limit == 0: + where = ["1=0"] + else: + # Oracle does not support LIMIT + where.append(f"ROWNUM <= {limit}") + return super().get_sql_query( table=table, columns=new_columns, where=where, hint=hint, - limit=limit, + limit=None, compact=compact, ) diff --git a/onetl/db/db_reader/db_reader.py b/onetl/db/db_reader/db_reader.py index b899faab8..2a41c69ff 100644 --- a/onetl/db/db_reader/db_reader.py +++ b/onetl/db/db_reader/db_reader.py @@ -17,6 +17,7 @@ ContainsGetDFSchemaMethod, ContainsGetMinMaxValues, ) +from onetl.exception import NoDataError from onetl.hooks import slot, support_hooks from onetl.hwm import AutoDetectHWM, Edge, Window from onetl.impl import FrozenModel, GenericOptions @@ -501,6 +502,97 @@ def validate_options(cls, options, values): return None + @slot + def has_data(self) -> bool: + """Returns ``True`` if there is some data in the source, ``False`` otherwise. |support_hooks| + + .. note:: + + This method can return different results depending on :ref:`strategy` + + .. warning:: + + If :etl-entities:`hwm ` is used, then method should be called inside :ref:`strategy` context. And vise-versa, if HWM is not used, this method should not be called within strategy. + + Raises + ------ + RuntimeError + + Current strategy is not compatible with HWM parameter. + + Examples + -------- + + .. code:: python + + reader = DBReader(...) + + # handle situation when there is no data in the source + if reader.has_data(): + df = reader.run() + else: + # implement your handling logic here + ... + """ + self._check_strategy() + + if not self._connection_checked: + self._log_parameters() + self.connection.check() + + window, limit = self._calculate_window_and_limit() + if limit == 0: + return False + + df = self.connection.read_source_as_df( + source=str(self.source), + columns=self.columns, + hint=self.hint, + where=self.where, + df_schema=self.df_schema, + window=window, + limit=1, + **self._get_read_kwargs(), + ) + + return bool(df.take(1)) + + @slot + def raise_if_no_data(self) -> None: + """Raises exception ``NoDataError`` if source does not contain any data. |support_hooks| + + .. note:: + + This method can return different results depending on :ref:`strategy` + + .. warning:: + + If :etl-entities:`hwm ` is used, then method should be called inside :ref:`strategy` context. And vise-versa, if HWM is not used, this method should not be called within strategy. + + Raises + ------ + RuntimeError + + Current strategy is not compatible with HWM parameter. + + :obj:`onetl.exception.NoDataError` + + There is no data in source. + + Examples + -------- + + .. code:: python + + reader = DBReader(...) + + # ensure that there is some data in the source before reading it using Spark + reader.raise_if_no_data() + """ + + if not self.has_data(): + raise NoDataError(f"No data in the source: {self.source}") + @slot def run(self) -> DataFrame: """ @@ -510,6 +602,10 @@ def run(self) -> DataFrame: This method can return different results depending on :ref:`strategy` + .. warning:: + + If :etl-entities:`hwm ` is used, then method should be called inside :ref:`strategy` context. And vise-versa, if HWM is not used, this method should not be called within strategy. + Returns ------- df : pyspark.sql.dataframe.DataFrame @@ -541,6 +637,12 @@ def run(self) -> DataFrame: self._connection_checked = True window, limit = self._calculate_window_and_limit() + + # update the HWM with the stop value + if self.hwm and window: + strategy: HWMStrategy = StrategyManager.get_current() # type: ignore[assignment] + strategy.update_hwm(window.stop_at.value) + df = self.connection.read_source_as_df( source=str(self.source), columns=self.columns, @@ -562,7 +664,9 @@ def _check_strategy(self): if self.hwm: if not isinstance(strategy, HWMStrategy): - raise RuntimeError(f"{class_name}(hwm=...) cannot be used with {strategy_name}") + raise RuntimeError( + f"{class_name}(hwm=...) cannot be used with {strategy_name}. Check documentation DBReader.has_data(): https://onetl.readthedocs.io/en/stable/db/db_reader.html#onetl.db.db_reader.db_reader.DBReader.has_data.", + ) self._prepare_hwm(strategy, self.hwm) elif isinstance(strategy, HWMStrategy): @@ -578,7 +682,7 @@ def _prepare_hwm(self, strategy: HWMStrategy, hwm: ColumnHWM): strategy.fetch_hwm() return - if not isinstance(strategy.hwm, ColumnHWM) or strategy.hwm.name != hwm.name: + if not isinstance(strategy.hwm, (ColumnHWM, KeyValueHWM)) or strategy.hwm.name != hwm.name: # exception raised when inside one strategy >1 processes on the same table but with different hwm columns # are executed, example: test_postgres_strategy_incremental_hwm_set_twice error_message = textwrap.dedent( @@ -673,7 +777,6 @@ def _calculate_window_and_limit(self) -> tuple[Window | None, int | None]: # no if start_value is not None and stop_value is not None: # we already have start and stop values, nothing to do window = Window(self.hwm.expression, start_from=strategy.current, stop_at=strategy.next) - strategy.update_hwm(window.stop_at.value) return window, None if not isinstance(self.connection, ContainsGetMinMaxValues): @@ -737,7 +840,6 @@ def _calculate_window_and_limit(self) -> tuple[Window | None, int | None]: # no stop_at=Edge(value=max_value), ) - strategy.update_hwm(window.stop_at.value) return window, None def _log_parameters(self) -> None: diff --git a/onetl/hooks/hook.py b/onetl/hooks/hook.py index e49039a3c..924d8912b 100644 --- a/onetl/hooks/hook.py +++ b/onetl/hooks/hook.py @@ -62,8 +62,7 @@ class Hook(Generic[T]): # noqa: WPS338 from onetl.hooks.hook import Hook, HookPriority - def some_func(*args, **kwargs): - ... + def some_func(*args, **kwargs): ... hook = Hook(callback=some_func, enabled=True, priority=HookPriority.FIRST) @@ -209,8 +208,7 @@ def __call__(self, *args, **kwargs) -> T | ContextDecorator: from onetl.hooks.hook import Hook, HookPriority - def some_func(*args, **kwargs): - ... + def some_func(*args, **kwargs): ... hook = Hook(callback=some_func) diff --git a/onetl/hooks/slot.py b/onetl/hooks/slot.py index b6f80efe9..e65366746 100644 --- a/onetl/hooks/slot.py +++ b/onetl/hooks/slot.py @@ -139,8 +139,7 @@ def _prepare_hook_args( @support_hooks class MyClass: @slot - def method(self, some, named="abc"): - ... + def method(self, some, named="abc"): ... then hook should have a compatible signature, like these ones: @@ -148,22 +147,19 @@ def method(self, some, named="abc"): @MyClass.method.bind @hook - def callback(self, some, named): - ... + def callback(self, some, named): ... .. code:: python @MyClass.method.bind @hook - def callback(self, some, **kwargs): - ... + def callback(self, some, **kwargs): ... .. code:: python @MyClass.method.bind @hook - def callback(my_class_instance, *args, **kwargs): - ... + def callback(my_class_instance, *args, **kwargs): ... .. note:: @@ -561,14 +557,12 @@ def suspend_hooks(self): @support_hooks class MyClass: @slot - def my_method(self, arg): - ... + def my_method(self, arg): ... @MyClass.my_method.bind @hook - def callback1(self, arg): - ... + def callback1(self, arg): ... obj = MyClass() @@ -598,14 +592,12 @@ def resume_hooks(self): @support_hooks class MyClass: @slot - def my_method(self, arg): - ... + def my_method(self, arg): ... @MyClass.my_method.bind @hook - def callback1(self, arg): - ... + def callback1(self, arg): ... obj = MyClass() @@ -656,36 +648,30 @@ def slot(method: Method) -> Method: @support_hooks class MyClass: @slot - def my_method(self, arg): - ... + def my_method(self, arg): ... @slot # decorator should be on top of all other decorators @classmethod - def class_method(cls): - ... + def class_method(cls): ... @slot # decorator should be on top of all other decorators @staticmethod - def static_method(arg): - ... + def static_method(arg): ... @MyClass.my_method.bind @hook - def callback1(self, arg): - ... + def callback1(self, arg): ... @MyClass.class_method.bind @hook - def callback2(cls): - ... + def callback2(cls): ... @MyClass.static_method.bind @hook - def callback3(arg): - ... + def callback3(arg): ... obj = MyClass() diff --git a/onetl/hooks/support_hooks.py b/onetl/hooks/support_hooks.py index 2500653f4..8558ae59d 100644 --- a/onetl/hooks/support_hooks.py +++ b/onetl/hooks/support_hooks.py @@ -111,13 +111,11 @@ def suspend_hooks(cls: type) -> None: @support_hooks class MyClass: @slot - def my_method(self, arg): - ... + def my_method(self, arg): ... @MyClass.my_method.hook - def callback(self, arg): - ... + def callback(self, arg): ... obj = MyClass() @@ -146,13 +144,11 @@ def resume_hooks(cls: type) -> None: @support_hooks class MyClass: @slot - def my_method(self, arg): - ... + def my_method(self, arg): ... @MyClass.my_method.hook - def callback(self, arg): - ... + def callback(self, arg): ... obj = MyClass() @@ -190,13 +186,11 @@ def support_hooks(cls: Klass) -> Klass: @support_hooks class MyClass: @slot - def my_method(self, arg): - ... + def my_method(self, arg): ... @MyClass.my_method.hook - def callback(self, arg): - ... + def callback(self, arg): ... MyClass().my_method() # will execute callback function diff --git a/onetl/hwm/store/hwm_class_registry.py b/onetl/hwm/store/hwm_class_registry.py index d9a2ce674..9a50c2f8a 100644 --- a/onetl/hwm/store/hwm_class_registry.py +++ b/onetl/hwm/store/hwm_class_registry.py @@ -77,8 +77,7 @@ def register_spark_type_to_hwm_type_mapping(*type_names: str): @register_spark_type_to_hwm_type_mapping("somename", "anothername") - class MyHWM(HWM): - ... + class MyHWM(HWM): ... assert SparkTypeToHWM.get("somename") == MyClass diff --git a/onetl/hwm/store/yaml_hwm_store.py b/onetl/hwm/store/yaml_hwm_store.py index 31064d572..0000ba4b2 100644 --- a/onetl/hwm/store/yaml_hwm_store.py +++ b/onetl/hwm/store/yaml_hwm_store.py @@ -50,8 +50,7 @@ def default_hwm_store_class(klass: type[BaseHWMStore]) -> type[BaseHWMStore]: @default_hwm_store_class - class MyClass(BaseHWMStore): - ... + class MyClass(BaseHWMStore): ... HWMStoreClassRegistry.get() == MyClass # default diff --git a/tests/fixtures/processing/fixtures.py b/tests/fixtures/processing/fixtures.py index 38b614441..3f541f692 100644 --- a/tests/fixtures/processing/fixtures.py +++ b/tests/fixtures/processing/fixtures.py @@ -76,3 +76,35 @@ def load_table_data(prepare_schema_table, processing): ) return prepare_schema_table + + +@pytest.fixture +def kafka_topic(processing, request): + topic = secrets.token_hex(6) + processing.create_topic(topic, num_partitions=1) + + def delete_topic(): + processing.delete_topic([topic]) + + request.addfinalizer(delete_topic) + return topic + + +@pytest.fixture +def kafka_dataframe_schema(): + from pyspark.sql.types import ( + FloatType, + LongType, + StringType, + StructField, + StructType, + ) + + return StructType( + [ + StructField("id_int", LongType(), nullable=True), + StructField("text_string", StringType(), nullable=True), + StructField("hwm_int", LongType(), nullable=True), + StructField("float_value", FloatType(), nullable=True), + ], + ) diff --git a/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_clickhouse_reader_integration.py b/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_clickhouse_reader_integration.py index 692c3ea45..e38de7413 100644 --- a/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_clickhouse_reader_integration.py +++ b/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_clickhouse_reader_integration.py @@ -338,6 +338,11 @@ def test_clickhouse_reader_snapshot_nothing_to_read(spark, processing, prepare_s first_span = processing.create_pandas_df(min_id=first_span_begin, max_id=first_span_end) second_span = processing.create_pandas_df(min_id=second_span_begin, max_id=second_span_end) + with pytest.raises(Exception, match="No data in the source:"): + reader.raise_if_no_data() + + assert not reader.has_data() + # no data yet, nothing to read df = reader.run() assert not df.count() @@ -352,8 +357,12 @@ def test_clickhouse_reader_snapshot_nothing_to_read(spark, processing, prepare_s # .run() is not called, but dataframes are lazy, so it now contains all data from the source processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int") + # check that read df has data + assert reader.has_data() + # read data explicitly df = reader.run() + processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int") # insert second span diff --git a/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_greenplum_reader_integration.py b/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_greenplum_reader_integration.py index 0e60a6689..f0ede8a82 100644 --- a/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_greenplum_reader_integration.py +++ b/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_greenplum_reader_integration.py @@ -280,6 +280,11 @@ def test_greenplum_reader_snapshot_nothing_to_read(spark, processing, prepare_sc first_span = processing.create_pandas_df(min_id=first_span_begin, max_id=first_span_end) second_span = processing.create_pandas_df(min_id=second_span_begin, max_id=second_span_end) + with pytest.raises(Exception, match="No data in the source:"): + reader.raise_if_no_data() + + assert not reader.has_data() + # no data yet, nothing to read df = reader.run() assert not df.count() @@ -294,8 +299,12 @@ def test_greenplum_reader_snapshot_nothing_to_read(spark, processing, prepare_sc # .run() is not called, but dataframes are lazy, so it now contains all data from the source processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int") + # check that read df has data + assert reader.has_data() + # read data explicitly df = reader.run() + processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int") # insert second span diff --git a/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_hive_reader_integration.py b/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_hive_reader_integration.py index 3a3a0a8a0..7831f835b 100644 --- a/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_hive_reader_integration.py +++ b/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_hive_reader_integration.py @@ -220,6 +220,11 @@ def test_hive_reader_snapshot_nothing_to_read(spark, processing, prepare_schema_ first_span = processing.create_pandas_df(min_id=first_span_begin, max_id=first_span_end) second_span = processing.create_pandas_df(min_id=second_span_begin, max_id=second_span_end) + with pytest.raises(Exception, match="No data in the source:"): + reader.raise_if_no_data() + + assert not reader.has_data() + # no data yet, nothing to read df = reader.run() assert not df.count() @@ -234,8 +239,12 @@ def test_hive_reader_snapshot_nothing_to_read(spark, processing, prepare_schema_ # .run() is not called, but dataframes are lazy, so it now contains all data from the source processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int") + # check that read df has data + assert reader.has_data() + # read data explicitly df = reader.run() + processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int") # insert second span diff --git a/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_kafka_reader_integration.py b/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_kafka_reader_integration.py index 635e257b9..86f2b7e98 100644 --- a/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_kafka_reader_integration.py +++ b/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_kafka_reader_integration.py @@ -1,6 +1,3 @@ -import json -import secrets - import pytest try: @@ -15,26 +12,6 @@ pytestmark = [pytest.mark.kafka, pytest.mark.db_connection, pytest.mark.connection] -@pytest.fixture(name="schema") -def dataframe_schema(): - from pyspark.sql.types import ( - FloatType, - LongType, - StringType, - StructField, - StructType, - ) - - return StructType( - [ - StructField("id_int", LongType(), nullable=True), - StructField("text_string", StringType(), nullable=True), - StructField("hwm_int", LongType(), nullable=True), - StructField("float_value", FloatType(), nullable=True), - ], - ) - - @pytest.fixture def kafka_schema(): from pyspark.sql.types import ( @@ -100,26 +77,7 @@ def kafka_schema_with_headers(): return schema # noqa: WPS331 -@pytest.fixture(name="kafka_processing") -def create_kafka_data(spark): - from tests.fixtures.processing.kafka import KafkaProcessing - - topic = secrets.token_hex(5) - proc = KafkaProcessing() - df = proc.create_spark_df(spark) - rows = [row.asDict() for row in df.collect()] - - for row_to_send in rows: - proc.send_message(topic, json.dumps(row_to_send).encode("utf-8")) - - yield topic, proc, df - # Release - proc.delete_topic([topic]) - - -def test_kafka_reader(spark, kafka_processing, schema): - topic, processing, expected_df = kafka_processing - +def test_kafka_reader(spark, processing, kafka_dataframe_schema, kafka_topic): kafka = Kafka( spark=spark, addresses=[f"{processing.host}:{processing.port}"], @@ -128,16 +86,20 @@ def test_kafka_reader(spark, kafka_processing, schema): reader = DBReader( connection=kafka, - source=topic, + source=kafka_topic, ) - df = reader.run() - processing.assert_equal_df(processing.json_deserialize(df, df_schema=schema), other_frame=expected_df) + first_span = processing.create_pandas_df(min_id=0, max_id=100) + processing.insert_pandas_df_into_topic(first_span, kafka_topic) + df = reader.run() + processing.assert_equal_df( + processing.json_deserialize(df, df_schema=kafka_dataframe_schema), + other_frame=first_span, + ) -def test_kafka_reader_columns_and_types_without_headers(spark, kafka_processing, kafka_schema): - topic, processing, _ = kafka_processing +def test_kafka_reader_columns_and_types_without_headers(spark, processing, kafka_schema, kafka_topic): kafka = Kafka( spark=spark, addresses=[f"{processing.host}:{processing.port}"], @@ -146,20 +108,21 @@ def test_kafka_reader_columns_and_types_without_headers(spark, kafka_processing, reader = DBReader( connection=kafka, - source=topic, + source=kafka_topic, ) + first_span = processing.create_pandas_df(min_id=0, max_id=100) + processing.insert_pandas_df_into_topic(first_span, kafka_topic) + df = reader.run() assert df.schema == kafka_schema # headers aren't included in schema if includeHeaders=False -def test_kafka_reader_columns_and_types_with_headers(spark, kafka_processing, kafka_schema_with_headers): +def test_kafka_reader_columns_and_types_with_headers(spark, processing, kafka_schema_with_headers, kafka_topic): if get_spark_version(spark).major < 3: pytest.skip("Spark 3.x or later is required to write/read 'headers' from Kafka messages") - topic, processing, _ = kafka_processing - kafka = Kafka( spark=spark, addresses=[f"{processing.host}:{processing.port}"], @@ -169,18 +132,19 @@ def test_kafka_reader_columns_and_types_with_headers(spark, kafka_processing, ka # Check that the DataFrame also has a "headers" column when includeHeaders=True reader = DBReader( connection=kafka, - source=topic, + source=kafka_topic, options=Kafka.ReadOptions(includeHeaders=True), ) + first_span = processing.create_pandas_df(min_id=0, max_id=100) + processing.insert_pandas_df_into_topic(first_span, kafka_topic) + df = reader.run() assert df.schema == kafka_schema_with_headers -def test_kafka_reader_topic_does_not_exist(spark, kafka_processing): - _, processing, _ = kafka_processing - +def test_kafka_reader_topic_does_not_exist(spark, processing): kafka = Kafka( spark=spark, addresses=[f"{processing.host}:{processing.port}"], @@ -197,11 +161,12 @@ def test_kafka_reader_topic_does_not_exist(spark, kafka_processing): @pytest.mark.parametrize("group_id_option", ["group.id", "groupIdPrefix"]) -def test_kafka_reader_with_group_id(group_id_option, spark, kafka_processing, schema): +def test_kafka_reader_with_group_id(group_id_option, spark, processing, kafka_dataframe_schema, kafka_topic): if get_spark_version(spark).major < 3: pytest.skip("Spark 3.x or later is required to pas group.id") - topic, processing, expected_df = kafka_processing + first_span = processing.create_pandas_df(min_id=0, max_id=100) + processing.insert_pandas_df_into_topic(first_span, kafka_topic) kafka = Kafka( spark=spark, @@ -212,12 +177,81 @@ def test_kafka_reader_with_group_id(group_id_option, spark, kafka_processing, sc reader = DBReader( connection=kafka, - source=topic, + source=kafka_topic, ) df = reader.run() - processing.assert_equal_df(processing.json_deserialize(df, df_schema=schema), other_frame=expected_df) + processing.assert_equal_df( + processing.json_deserialize(df, df_schema=kafka_dataframe_schema), + other_frame=first_span, + ) # Spark does not report to Kafka which messages were read, so Kafka does not remember latest offsets for groupId # https://stackoverflow.com/a/64003569 df = reader.run() - processing.assert_equal_df(processing.json_deserialize(df, df_schema=schema), other_frame=expected_df) + processing.assert_equal_df( + processing.json_deserialize(df, df_schema=kafka_dataframe_schema), + other_frame=first_span, + ) + + +def test_kafka_reader_snapshot_nothing_to_read(spark, processing, kafka_dataframe_schema, kafka_topic): + kafka = Kafka( + spark=spark, + addresses=[f"{processing.host}:{processing.port}"], + cluster="cluster", + ) + + reader = DBReader( + connection=kafka, + source=kafka_topic, + ) + + # 0..100 + first_span_begin = 0 + first_span_end = 100 + + # 110..210 + second_span_begin = 110 + second_span_end = 210 + + first_span = processing.create_pandas_df(min_id=first_span_begin, max_id=first_span_end) + second_span = processing.create_pandas_df(min_id=second_span_begin, max_id=second_span_end) + + with pytest.raises(Exception, match="No data in the source:"): + reader.raise_if_no_data() + + assert not reader.has_data() + + # no data yet, nothing to read + df = reader.run() + assert not df.count() + + # insert first span + processing.insert_pandas_df_into_topic(first_span, kafka_topic) + + # .run() is not called, but dataframes are lazy, so it now contains all data from the source + deserialized_df = processing.json_deserialize(df, df_schema=kafka_dataframe_schema) + processing.assert_equal_df(df=deserialized_df, other_frame=first_span, order_by="id_int") + + # check that read df has data + assert reader.has_data() + + # read data explicitly + df = reader.run() + + deserialized_df = processing.json_deserialize(df, df_schema=kafka_dataframe_schema) + processing.assert_equal_df(df=deserialized_df, other_frame=first_span, order_by="id_int") + + # insert second span + processing.insert_pandas_df_into_topic(second_span, kafka_topic) + total_span = pandas.concat([first_span, second_span], ignore_index=True) + + # .run() is not called, but dataframes are lazy, so it now contains all data from the source + deserialized_df = processing.json_deserialize(df, df_schema=kafka_dataframe_schema) + processing.assert_equal_df(df=deserialized_df, other_frame=total_span, order_by="id_int") + + # read data explicitly + df = reader.run() + + deserialized_df = processing.json_deserialize(df, df_schema=kafka_dataframe_schema) + processing.assert_equal_df(df=deserialized_df, other_frame=total_span, order_by="id_int") diff --git a/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_mongodb_reader_integration.py b/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_mongodb_reader_integration.py index 5071270d2..b06527a0b 100644 --- a/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_mongodb_reader_integration.py +++ b/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_mongodb_reader_integration.py @@ -160,6 +160,11 @@ def test_mongodb_reader_snapshot_nothing_to_read(spark, processing, prepare_sche first_span = processing.create_pandas_df(min_id=first_span_begin, max_id=first_span_end) second_span = processing.create_pandas_df(min_id=second_span_begin, max_id=second_span_end) + with pytest.raises(Exception, match="No data in the source:"): + reader.raise_if_no_data() + + assert not reader.has_data() + # no data yet, nothing to read df = reader.run() assert not df.count() @@ -174,8 +179,12 @@ def test_mongodb_reader_snapshot_nothing_to_read(spark, processing, prepare_sche # .run() is not called, but dataframes are lazy, so it now contains all data from the source processing.assert_equal_df(df=df, other_frame=first_span, order_by="_id") + # check that read df has data + assert reader.has_data() + # read data explicitly df = reader.run() + processing.assert_equal_df(df=df, other_frame=first_span, order_by="_id") # insert second span diff --git a/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_mssql_reader_integration.py b/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_mssql_reader_integration.py index 5409b85e9..781121e4b 100644 --- a/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_mssql_reader_integration.py +++ b/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_mssql_reader_integration.py @@ -315,6 +315,11 @@ def test_mssql_reader_snapshot_nothing_to_read(spark, processing, prepare_schema first_span = processing.create_pandas_df(min_id=first_span_begin, max_id=first_span_end) second_span = processing.create_pandas_df(min_id=second_span_begin, max_id=second_span_end) + with pytest.raises(Exception, match="No data in the source:"): + reader.raise_if_no_data() + + assert not reader.has_data() + # no data yet, nothing to read df = reader.run() assert not df.count() @@ -329,8 +334,12 @@ def test_mssql_reader_snapshot_nothing_to_read(spark, processing, prepare_schema # .run() is not called, but dataframes are lazy, so it now contains all data from the source processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int") + # check that read df has data + assert reader.has_data() + # read data explicitly df = reader.run() + processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int") # insert second span diff --git a/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_mysql_reader_integration.py b/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_mysql_reader_integration.py index 65b922732..3f12746b8 100644 --- a/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_mysql_reader_integration.py +++ b/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_mysql_reader_integration.py @@ -331,6 +331,11 @@ def test_mysql_reader_snapshot_nothing_to_read(spark, processing, prepare_schema first_span = processing.create_pandas_df(min_id=first_span_begin, max_id=first_span_end) second_span = processing.create_pandas_df(min_id=second_span_begin, max_id=second_span_end) + with pytest.raises(Exception, match="No data in the source:"): + reader.raise_if_no_data() + + assert not reader.has_data() + # no data yet, nothing to read df = reader.run() assert not df.count() @@ -345,8 +350,12 @@ def test_mysql_reader_snapshot_nothing_to_read(spark, processing, prepare_schema # .run() is not called, but dataframes are lazy, so it now contains all data from the source processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int") + # check that read df has data + assert reader.has_data() + # read data explicitly df = reader.run() + processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int") # insert second span diff --git a/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_oracle_reader_integration.py b/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_oracle_reader_integration.py index 126420c61..d9864967c 100644 --- a/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_oracle_reader_integration.py +++ b/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_oracle_reader_integration.py @@ -315,6 +315,11 @@ def test_oracle_reader_snapshot_nothing_to_read(spark, processing, prepare_schem first_span = processing.create_pandas_df(min_id=first_span_begin, max_id=first_span_end) second_span = processing.create_pandas_df(min_id=second_span_begin, max_id=second_span_end) + with pytest.raises(Exception, match="No data in the source:"): + reader.raise_if_no_data() + + assert not reader.has_data() + # no data yet, nothing to read df = reader.run() assert not df.count() @@ -329,8 +334,12 @@ def test_oracle_reader_snapshot_nothing_to_read(spark, processing, prepare_schem # .run() is not called, but dataframes are lazy, so it now contains all data from the source processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int") + # check that read df has data + assert reader.has_data() + # read data explicitly df = reader.run() + processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int") # insert second span diff --git a/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_postgres_reader_integration.py b/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_postgres_reader_integration.py index bbb2ee472..248a575b9 100644 --- a/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_postgres_reader_integration.py +++ b/tests/tests_integration/tests_core_integration/tests_db_reader_integration/test_postgres_reader_integration.py @@ -377,6 +377,11 @@ def test_postgres_reader_snapshot_nothing_to_read(spark, processing, prepare_sch first_span = processing.create_pandas_df(min_id=first_span_begin, max_id=first_span_end) second_span = processing.create_pandas_df(min_id=second_span_begin, max_id=second_span_end) + with pytest.raises(Exception, match="No data in the source:"): + reader.raise_if_no_data() + + assert not reader.has_data() + # no data yet, nothing to read df = reader.run() assert not df.count() @@ -391,8 +396,12 @@ def test_postgres_reader_snapshot_nothing_to_read(spark, processing, prepare_sch # .run() is not called, but dataframes are lazy, so it now contains all data from the source processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int") + # check that read df has data + assert reader.has_data() + # read data explicitly df = reader.run() + processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int") # insert second span diff --git a/tests/tests_integration/tests_strategy_integration/test_strategy_incremental_batch.py b/tests/tests_integration/tests_strategy_integration/test_strategy_incremental_batch.py index 5371e8e27..66c7ad31a 100644 --- a/tests/tests_integration/tests_strategy_integration/test_strategy_incremental_batch.py +++ b/tests/tests_integration/tests_strategy_integration/test_strategy_incremental_batch.py @@ -944,3 +944,27 @@ def test_postgres_strategy_incremental_batch_nothing_to_read(spark, processing, processing.assert_equal_df(df=df, other_frame=second_span, order_by="id_int") hwm = store.get_hwm(name=hwm_name) assert hwm.value == second_span_max + + +def test_postgres_has_data_outside_incremental_strategy(spark, processing, prepare_schema_table): + postgres = Postgres( + host=processing.host, + user=processing.user, + password=processing.password, + database=processing.database, + spark=spark, + ) + + reader = DBReader( + connection=postgres, + source=prepare_schema_table.full_name, + hwm=ColumnIntHWM(name=secrets.token_hex(5), expression="text_string"), + ) + + with pytest.raises( + RuntimeError, + match=re.escape( + "Check documentation DBReader.has_data(): ", + ), + ): + reader.has_data() diff --git a/tests/tests_integration/tests_strategy_integration/test_strategy_snapshot_batch.py b/tests/tests_integration/tests_strategy_integration/test_strategy_snapshot_batch.py index e1984ab05..4c8121dbc 100644 --- a/tests/tests_integration/tests_strategy_integration/test_strategy_snapshot_batch.py +++ b/tests/tests_integration/tests_strategy_integration/test_strategy_snapshot_batch.py @@ -696,3 +696,27 @@ def test_postgres_strategy_snapshot_batch_nothing_to_read(spark, processing, pre total_span = pandas.concat([first_span, second_span], ignore_index=True) processing.assert_equal_df(df=df, other_frame=total_span, order_by="id_int") + + +def test_postgres_has_data_outside_snapshot_batch_strategy(spark, processing, prepare_schema_table): + postgres = Postgres( + host=processing.host, + user=processing.user, + password=processing.password, + database=processing.database, + spark=spark, + ) + + reader = DBReader( + connection=postgres, + source=prepare_schema_table.full_name, + hwm=ColumnIntHWM(name=secrets.token_hex(5), expression="text_string"), + ) + + with pytest.raises( + RuntimeError, + match=re.escape( + "Check documentation DBReader.has_data(): ", + ), + ): + reader.has_data() diff --git a/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_clickhouse.py b/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_clickhouse.py index 9a628c3e6..77ec071b3 100644 --- a/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_clickhouse.py +++ b/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_clickhouse.py @@ -145,6 +145,7 @@ def test_clickhouse_strategy_incremental_nothing_to_read(spark, processing, prep # no data yet, nothing to read with IncrementalStrategy(): + assert not reader.has_data() df = reader.run() assert not df.count() @@ -165,6 +166,7 @@ def test_clickhouse_strategy_incremental_nothing_to_read(spark, processing, prep # set hwm value to 50 with IncrementalStrategy(): + assert reader.has_data() df = reader.run() processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int") diff --git a/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_greenplum.py b/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_greenplum.py index 7a2be6b68..8159bca25 100644 --- a/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_greenplum.py +++ b/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_greenplum.py @@ -152,6 +152,7 @@ def test_greenplum_strategy_incremental_nothing_to_read(spark, processing, prepa # no data yet, nothing to read with IncrementalStrategy(): + assert not reader.has_data() df = reader.run() assert not df.count() @@ -172,6 +173,7 @@ def test_greenplum_strategy_incremental_nothing_to_read(spark, processing, prepa # set hwm value to 50 with IncrementalStrategy(): + assert reader.has_data() df = reader.run() processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int") diff --git a/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_hive.py b/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_hive.py index c576a5d8c..6cc860f56 100644 --- a/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_hive.py +++ b/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_hive.py @@ -136,6 +136,7 @@ def test_hive_strategy_incremental_nothing_to_read(spark, processing, prepare_sc # no data yet, nothing to read with IncrementalStrategy(): + assert not reader.has_data() df = reader.run() assert not df.count() @@ -156,6 +157,7 @@ def test_hive_strategy_incremental_nothing_to_read(spark, processing, prepare_sc # set hwm value to 50 with IncrementalStrategy(): + assert reader.has_data() df = reader.run() processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int") diff --git a/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_kafka.py b/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_kafka.py index b3df91241..1ba96cee1 100644 --- a/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_kafka.py +++ b/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_kafka.py @@ -11,26 +11,6 @@ pytestmark = pytest.mark.kafka -@pytest.fixture(name="schema") -def dataframe_schema(): - from pyspark.sql.types import ( - FloatType, - LongType, - StringType, - StructField, - StructType, - ) - - return StructType( - [ - StructField("id_int", LongType(), nullable=True), - StructField("text_string", StringType(), nullable=True), - StructField("hwm_int", LongType(), nullable=True), - StructField("float_value", FloatType(), nullable=True), - ], - ) - - @pytest.mark.parametrize( "num_partitions", [ @@ -42,13 +22,13 @@ def dataframe_schema(): def test_kafka_strategy_incremental( spark, processing, - schema, + kafka_dataframe_schema, + kafka_topic, num_partitions, ): from pyspark.sql.functions import count as spark_count hwm_type = KeyValueIntHWM - topic = secrets.token_hex(6) hwm_name = secrets.token_hex(5) store = HWMStoreStackManager.get_current() @@ -60,11 +40,11 @@ def test_kafka_strategy_incremental( # change the number of partitions for the Kafka topic to test work for different partitioning cases if num_partitions is not None: - processing.change_topic_partitions(topic, num_partitions) + processing.change_topic_partitions(kafka_topic, num_partitions) reader = DBReader( connection=kafka, - source=topic, + source=kafka_topic, hwm=DBReader.AutoDetectHWM(name=hwm_name, expression="offset"), ) @@ -82,7 +62,7 @@ def test_kafka_strategy_incremental( second_span = processing.create_pandas_df(min_id=second_span_begin, max_id=second_span_end) # insert first span - processing.insert_pandas_df_into_topic(first_span, topic) + processing.insert_pandas_df_into_topic(first_span, kafka_topic) # hwm is not in the store assert store.get_hwm(hwm_name) is None @@ -101,11 +81,11 @@ def test_kafka_strategy_incremental( assert hwm.value == partition_count_dict_first_df # all the data has been read - deserialized_first_df = processing.json_deserialize(first_df, df_schema=schema) + deserialized_first_df = processing.json_deserialize(first_df, df_schema=kafka_dataframe_schema) processing.assert_equal_df(df=deserialized_first_df, other_frame=first_span, order_by="id_int") # insert second span - processing.insert_pandas_df_into_topic(second_span, topic) + processing.insert_pandas_df_into_topic(second_span, kafka_topic) with IncrementalStrategy(): second_df = reader.run() @@ -118,7 +98,7 @@ def test_kafka_strategy_incremental( partition_count_dict_combined = {row["partition"]: row["count"] for row in partition_counts_combined.collect()} assert hwm.value == partition_count_dict_combined - deserialized_second_df = processing.json_deserialize(second_df, df_schema=schema) + deserialized_second_df = processing.json_deserialize(second_df, df_schema=kafka_dataframe_schema) processing.assert_equal_df(df=deserialized_second_df, other_frame=second_span, order_by="id_int") @@ -130,10 +110,15 @@ def test_kafka_strategy_incremental( 10, ], ) -def test_kafka_strategy_incremental_nothing_to_read(spark, processing, schema, num_partitions): +def test_kafka_strategy_incremental_nothing_to_read( + spark, + processing, + kafka_dataframe_schema, + num_partitions, + kafka_topic, +): from pyspark.sql.functions import count as spark_count - topic = secrets.token_hex(6) hwm_name = secrets.token_hex(5) store = HWMStoreStackManager.get_current() @@ -145,11 +130,11 @@ def test_kafka_strategy_incremental_nothing_to_read(spark, processing, schema, n # change the number of partitions for the Kafka topic to test work for different partitioning cases if num_partitions is not None: - processing.change_topic_partitions(topic, num_partitions) + processing.change_topic_partitions(kafka_topic, num_partitions) reader = DBReader( connection=kafka, - source=topic, + source=kafka_topic, hwm=DBReader.AutoDetectHWM(name=hwm_name, expression="offset"), ) @@ -165,6 +150,7 @@ def test_kafka_strategy_incremental_nothing_to_read(spark, processing, schema, n # no data yet, nothing to read with IncrementalStrategy(): + assert not reader.has_data() df = reader.run() assert not df.count() @@ -172,7 +158,7 @@ def test_kafka_strategy_incremental_nothing_to_read(spark, processing, schema, n assert all(value == 0 for value in hwm.value.values()) # insert first span - processing.insert_pandas_df_into_topic(first_span, topic) + processing.insert_pandas_df_into_topic(first_span, kafka_topic) # .run() is not called - dataframe still empty - HWM not updated assert not df.count() @@ -181,6 +167,7 @@ def test_kafka_strategy_incremental_nothing_to_read(spark, processing, schema, n # set hwm value to 50 with IncrementalStrategy(): + assert reader.has_data() first_df = reader.run() hwm = store.get_hwm(name=hwm_name) @@ -189,7 +176,7 @@ def test_kafka_strategy_incremental_nothing_to_read(spark, processing, schema, n partition_count_dict_first_df = {row["partition"]: row["count"] for row in partition_counts.collect()} assert hwm.value == partition_count_dict_first_df - deserialized_df = processing.json_deserialize(first_df, df_schema=schema) + deserialized_df = processing.json_deserialize(first_df, df_schema=kafka_dataframe_schema) processing.assert_equal_df(df=deserialized_df, other_frame=first_span, order_by="id_int") # no new data yet, nothing to read @@ -202,7 +189,7 @@ def test_kafka_strategy_incremental_nothing_to_read(spark, processing, schema, n assert hwm.value == partition_count_dict_first_df # insert second span - processing.insert_pandas_df_into_topic(second_span, topic) + processing.insert_pandas_df_into_topic(second_span, kafka_topic) # .run() is not called - dataframe still empty - HWM not updated assert not df.count() @@ -221,7 +208,7 @@ def test_kafka_strategy_incremental_nothing_to_read(spark, processing, schema, n partition_count_dict_combined = {row["partition"]: row["count"] for row in partition_counts_combined.collect()} assert hwm.value == partition_count_dict_combined - deserialized_df = processing.json_deserialize(df, df_schema=schema) + deserialized_df = processing.json_deserialize(df, df_schema=kafka_dataframe_schema) processing.assert_equal_df(df=deserialized_df, other_frame=second_span, order_by="id_int") @@ -235,13 +222,12 @@ def test_kafka_strategy_incremental_nothing_to_read(spark, processing, schema, n def test_kafka_strategy_incremental_with_new_partition( spark, processing, - schema, initial_partitions, additional_partitions, + kafka_topic, ): from pyspark.sql.functions import count as spark_count - topic = secrets.token_hex(6) hwm_name = secrets.token_hex(5) store = HWMStoreStackManager.get_current() @@ -253,12 +239,12 @@ def test_kafka_strategy_incremental_with_new_partition( reader = DBReader( connection=kafka, - source=topic, + source=kafka_topic, hwm=DBReader.AutoDetectHWM(name=hwm_name, expression="offset"), ) # Initial setup with `initial_partitions` partitions - processing.change_topic_partitions(topic, initial_partitions) + processing.change_topic_partitions(kafka_topic, initial_partitions) # 0..50 first_span_begin = 0 @@ -271,7 +257,7 @@ def test_kafka_strategy_incremental_with_new_partition( first_span = processing.create_pandas_df(min_id=first_span_begin, max_id=first_span_end) second_span = processing.create_pandas_df(min_id=second_span_begin, max_id=second_span_end) - processing.insert_pandas_df_into_topic(first_span, topic) + processing.insert_pandas_df_into_topic(first_span, kafka_topic) with IncrementalStrategy(): first_df = reader.run() @@ -284,8 +270,8 @@ def test_kafka_strategy_incremental_with_new_partition( hwm = store.get_hwm(name=hwm_name) first_run_hwm_keys_num = len(hwm.value.keys()) - processing.change_topic_partitions(topic, initial_partitions + additional_partitions) - processing.insert_pandas_df_into_topic(second_span, topic) + processing.change_topic_partitions(kafka_topic, initial_partitions + additional_partitions) + processing.insert_pandas_df_into_topic(second_span, kafka_topic) with IncrementalStrategy(): second_df = reader.run() diff --git a/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_mongodb.py b/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_mongodb.py index 888300a2d..3e8287257 100644 --- a/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_mongodb.py +++ b/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_mongodb.py @@ -181,6 +181,7 @@ def test_mongodb_strategy_incremental_nothing_to_read( # no data yet, nothing to read with IncrementalStrategy(): + assert not reader.has_data() df = reader.run() assert not df.count() @@ -201,6 +202,7 @@ def test_mongodb_strategy_incremental_nothing_to_read( # set hwm value to 50 with IncrementalStrategy(): + assert reader.has_data() df = reader.run() processing.assert_equal_df(df=df, other_frame=first_span, order_by="_id") diff --git a/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_mssql.py b/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_mssql.py index 90447d3b3..a03db2759 100644 --- a/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_mssql.py +++ b/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_mssql.py @@ -152,6 +152,7 @@ def test_mssql_strategy_incremental_nothing_to_read(spark, processing, prepare_s # no data yet, nothing to read with IncrementalStrategy(): + assert not reader.has_data() df = reader.run() assert not df.count() @@ -172,6 +173,7 @@ def test_mssql_strategy_incremental_nothing_to_read(spark, processing, prepare_s # set hwm value to 50 with IncrementalStrategy(): + assert reader.has_data() df = reader.run() processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int") diff --git a/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_mysql.py b/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_mysql.py index 29bba214e..e762eb0da 100644 --- a/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_mysql.py +++ b/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_mysql.py @@ -150,6 +150,7 @@ def test_mysql_strategy_incremental_nothing_to_read(spark, processing, prepare_s # no data yet, nothing to read with IncrementalStrategy(): + assert not reader.has_data() df = reader.run() assert not df.count() @@ -170,6 +171,7 @@ def test_mysql_strategy_incremental_nothing_to_read(spark, processing, prepare_s # set hwm value to 50 with IncrementalStrategy(): + assert reader.has_data() df = reader.run() processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int") diff --git a/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_oracle.py b/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_oracle.py index 3e07546f4..2bb9165e6 100644 --- a/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_oracle.py +++ b/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_oracle.py @@ -165,6 +165,7 @@ def test_oracle_strategy_incremental_nothing_to_read(spark, processing, prepare_ # no data yet, nothing to read with IncrementalStrategy(): + assert not reader.has_data() df = reader.run() assert not df.count() @@ -185,6 +186,7 @@ def test_oracle_strategy_incremental_nothing_to_read(spark, processing, prepare_ # set hwm value to 50 with IncrementalStrategy(): + assert reader.has_data() df = reader.run() processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int") diff --git a/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_postgres.py b/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_postgres.py index 424949751..8d5f73705 100644 --- a/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_postgres.py +++ b/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_postgres.py @@ -252,6 +252,7 @@ def test_postgres_strategy_incremental_nothing_to_read(spark, processing, prepar # no data yet, nothing to read with IncrementalStrategy(): + assert not reader.has_data() df = reader.run() assert not df.count() @@ -272,6 +273,7 @@ def test_postgres_strategy_incremental_nothing_to_read(spark, processing, prepar # set hwm value to 50 with IncrementalStrategy(): + assert reader.has_data() df = reader.run() processing.assert_equal_df(df=df, other_frame=first_span, order_by="id_int") diff --git a/tests/tests_unit/tests_db_connection_unit/test_dialect_unit.py b/tests/tests_unit/tests_db_connection_unit/test_dialect_unit.py index 8faed9256..7dc7ac7d2 100644 --- a/tests/tests_unit/tests_db_connection_unit/test_dialect_unit.py +++ b/tests/tests_unit/tests_db_connection_unit/test_dialect_unit.py @@ -2,7 +2,7 @@ import pytest -from onetl.connection import Oracle, Postgres +from onetl.connection import MSSQL, Oracle, Postgres pytestmark = [pytest.mark.postgres] @@ -262,3 +262,41 @@ def test_db_dialect_get_sql_query_compact_true(spark_mock): ).strip() assert result == expected + + +@pytest.mark.parametrize( + "limit, where, expected_query", + [ + (None, None, "SELECT\n *\nFROM\n default.test"), + (0, None, "SELECT\n *\nFROM\n default.test\nWHERE\n 1=0"), + (5, None, "SELECT\n *\nFROM\n default.test\nWHERE\n ROWNUM <= 5"), + (None, "column1 = 'value'", "SELECT\n *\nFROM\n default.test\nWHERE\n column1 = 'value'"), + (0, "column1 = 'value'", "SELECT\n *\nFROM\n default.test\nWHERE\n 1=0"), + ( + 5, + "column1 = 'value'", + "SELECT\n *\nFROM\n default.test\nWHERE\n (column1 = 'value')\n AND\n (ROWNUM <= 5)", + ), + ], +) +def test_oracle_dialect_get_sql_query_limit_where(spark_mock, limit, where, expected_query): + conn = Oracle(host="some_host", user="user", sid="XE", password="passwd", spark=spark_mock) + result = conn.dialect.get_sql_query(table="default.test", limit=limit, where=where) + assert result.strip() == expected_query.strip() + + +@pytest.mark.parametrize( + "limit, where, expected_query", + [ + (None, None, "SELECT\n *\nFROM\n default.test"), + (0, None, "SELECT\n *\nFROM\n default.test\nWHERE\n 1 = 0"), + (5, None, "SELECT TOP 5\n *\nFROM\n default.test"), + (None, "column1 = 'value'", "SELECT\n *\nFROM\n default.test\nWHERE\n column1 = 'value'"), + (0, "column1 = 'value'", "SELECT\n *\nFROM\n default.test\nWHERE\n 1 = 0"), + (5, "column1 = 'value'", "SELECT TOP 5\n *\nFROM\n default.test\nWHERE\n column1 = 'value'"), + ], +) +def test_mssql_dialect_get_sql_query_limit_where(spark_mock, limit, where, expected_query): + conn = MSSQL(host="some_host", user="user", database="database", password="passwd", spark=spark_mock) + result = conn.dialect.get_sql_query(table="default.test", limit=limit, where=where) + assert result.strip() == expected_query.strip()