diff --git a/docs/changelog/next_release/*.feature.rst b/docs/changelog/next_release/202.feature.rst similarity index 100% rename from docs/changelog/next_release/*.feature.rst rename to docs/changelog/next_release/202.feature.rst diff --git a/onetl/strategy/incremental_strategy.py b/onetl/strategy/incremental_strategy.py index a4dd283c9..c5d26437d 100644 --- a/onetl/strategy/incremental_strategy.py +++ b/onetl/strategy/incremental_strategy.py @@ -294,33 +294,6 @@ class IncrementalStrategy(OffsetMixin, HWMStrategy): FROM public.mydata WHERE business_dt > CAST('2021-01-09' AS DATE); -- from HWM-offset (EXCLUDING first row) - Incremental run with :ref:`file-downloader` and ``hwm=FileListHWM(...)``: - - .. code:: python - - from onetl.connection import SFTP - from onetl.file import FileDownloader - from onetl.strategy import SnapshotStrategy - from etl_entities import FileListHWM - - sftp = SFTP( - host="sftp.domain.com", - user="user", - password="*****", - ) - - downloader = FileDownloader( - connection=sftp, - source_path="/remote", - local_path="/local", - hwm=FileListHWM(name="some_hwm_name"), - ) - - with IncrementalStrategy(): - df = downloader.run() - - # current run will download only files which were not downloaded in previous runs - Incremental run with :ref:`db-reader` and :ref:`kafka` connection (by ``offset`` in topic - :etl-entities:`KeyValueHWM `): @@ -349,11 +322,39 @@ class IncrementalStrategy(OffsetMixin, HWMStrategy): reader = DBReader( connection=kafka, source="topic_name", - hwm=DBReader.AutoDetectHWM(name="some_hwm_name", expression="**offset**"), + hwm=DBReader.AutoDetectHWM(name="some_hwm_name", expression="offset"), ) with IncrementalStrategy(): df = reader.run() + + Incremental run with :ref:`file-downloader` and ``hwm=FileListHWM(...)``: + + .. code:: python + + from onetl.connection import SFTP + from onetl.file import FileDownloader + from onetl.strategy import SnapshotStrategy + from etl_entities import FileListHWM + + sftp = SFTP( + host="sftp.domain.com", + user="user", + password="*****", + ) + + downloader = FileDownloader( + connection=sftp, + source_path="/remote", + local_path="/local", + hwm=FileListHWM(name="some_hwm_name"), + ) + + with IncrementalStrategy(): + df = downloader.run() + + # current run will download only files which were not downloaded in previous runs + """ diff --git a/tests/fixtures/processing/kafka.py b/tests/fixtures/processing/kafka.py index 9a0c4a5af..bddb3490b 100644 --- a/tests/fixtures/processing/kafka.py +++ b/tests/fixtures/processing/kafka.py @@ -1,5 +1,6 @@ from __future__ import annotations +import json import os from typing import TYPE_CHECKING @@ -158,6 +159,11 @@ def delete_topic(self, topics: list[str], timeout: float = DEFAULT_TIMEOUT): # https://github.com/confluentinc/confluent-kafka-python/issues/813 admin.delete_topics(topics, request_timeout=timeout) + def insert_pandas_df_into_topic(self, df: pandas.DataFrame, topic: str): + for _, row in df.iterrows(): + message = json.dumps(row.to_dict()) + self.send_message(topic, message.encode("utf-8")) + def topic_exists(self, topic: str, timeout: float = DEFAULT_TIMEOUT) -> bool: admin = self.get_admin_client() topic_metadata = admin.list_topics(timeout=timeout) diff --git a/tests/tests_integration/tests_strategy_integration/tests_incremental_batch_strategy_integration/test_strategy_incremental_batch_kafka.py b/tests/tests_integration/tests_strategy_integration/tests_incremental_batch_strategy_integration/test_strategy_incremental_batch_kafka.py index 7ce7e6b51..b40568afd 100644 --- a/tests/tests_integration/tests_strategy_integration/tests_incremental_batch_strategy_integration/test_strategy_incremental_batch_kafka.py +++ b/tests/tests_integration/tests_strategy_integration/tests_incremental_batch_strategy_integration/test_strategy_incremental_batch_kafka.py @@ -32,6 +32,7 @@ def test_strategy_kafka_with_batch_strategy_error(strategy, spark): table="topic", hwm=DBReader.AutoDetectHWM(name=secrets.token_hex(5), expression="offset"), ) - with pytest.raises(TypeError, match=re.escape("unsupported operand type(s) for +: 'frozendict' and 'int'")): + # raises as at current version there is no way to distribute step size between kafka partitions + with pytest.raises(TypeError, match=re.escape("unsupported operand type(s) for +")): for _ in batches: reader.run() diff --git a/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_kafka.py b/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_kafka.py index 8063314ef..860a65d3d 100644 --- a/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_kafka.py +++ b/tests/tests_integration/tests_strategy_integration/tests_incremental_strategy_integration/test_strategy_increment_kafka.py @@ -1,4 +1,3 @@ -import json import secrets import pytest @@ -93,9 +92,7 @@ def test_kafka_strategy_incremental( second_span_max = len(second_span) + first_span_max # insert first span - for _, row in first_span.iterrows(): - message = json.dumps(row.to_dict()) - processing.send_message(topic, message.encode("utf-8")) + processing.insert_pandas_df_into_topic(first_span, topic) # hwm is not in the store assert store.get_hwm(hwm_name) is None @@ -114,9 +111,7 @@ def test_kafka_strategy_incremental( processing.assert_equal_df(df=deserialized_first_df, other_frame=first_span, order_by="id_int") # insert second span - for _, row in second_span.iterrows(): - message = json.dumps(row.to_dict()) - processing.send_message(topic, message.encode("utf-8")) + processing.insert_pandas_df_into_topic(second_span, topic) with IncrementalStrategy(): second_df = reader.run() @@ -184,9 +179,7 @@ def test_kafka_strategy_incremental_nothing_to_read(spark, processing, schema, n assert sum(value for value in hwm.value.values()) == 0 # insert first span - for _, row in first_span.iterrows(): - message = json.dumps(row.to_dict()) - processing.send_message(topic, message.encode("utf-8")) + processing.insert_pandas_df_into_topic(first_span, topic) # .run() is not called - dataframe still empty - HWM not updated assert not df.count() @@ -212,9 +205,7 @@ def test_kafka_strategy_incremental_nothing_to_read(spark, processing, schema, n assert sum(value for value in hwm.value.values()) == first_span_max # insert second span - for _, row in second_span.iterrows(): - message = json.dumps(row.to_dict()) - processing.send_message(topic, message.encode("utf-8")) + processing.insert_pandas_df_into_topic(second_span, topic) # .run() is not called - dataframe still empty - HWM not updated assert not df.count() @@ -262,3 +253,32 @@ def test_kafka_strategy_incremental_wrong_hwm( source=topic, hwm=DBReader.AutoDetectHWM(name=hwm_name, expression=hwm_column), ) + + +def test_kafka_incremental_read_no_new_data(spark, processing): + topic = secrets.token_hex(6) + hwm_name = secrets.token_hex(5) + + kafka = Kafka( + addresses=[f"{processing.host}:{processing.port}"], + cluster="cluster", + spark=spark, + ) + + reader = DBReader( + connection=kafka, + source=topic, + hwm=DBReader.AutoDetectHWM(name=hwm_name, expression="offset"), + ) + + first_span = processing.create_pandas_df(min_id=0, max_id=100) + processing.insert_pandas_df_into_topic(first_span, topic) + + with IncrementalStrategy(): + reader.run() + + # no new data was added to kafka + with IncrementalStrategy(): + new_data_df = reader.run() + + assert new_data_df.rdd.isEmpty()