Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOP-11752] - add support of incremental strategy in Kafka #202

Merged
merged 5 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/changelog/next_release/202.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
Add support of ``Incremental Strategies`` for ``Kafka`` connection. This lets you resume reading data from a Kafka topic starting at the last committed offset from your previous run.

.. code-block:: python

reader = DBReader(
connection=Kafka(...),
source="topic_name",
hwm=AutoDetectHWM(name="some_hwm_name", expression="offset"),
)

with IncrementalStrategy():
df = reader.run()
4 changes: 0 additions & 4 deletions docs/connection/db_connection/kafka/read.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ Reading from Kafka

For reading data from Kafka, use :obj:`DBReader <onetl.db.db_reader.db_reader.DBReader>` with specific options (see below).

.. warning::

Currently, Kafka does not support :ref:`strategy`. You can only read the **whole** topic.

.. note::

Unlike other connection classes, Kafka always return dataframe with fixed schema
Expand Down
29 changes: 28 additions & 1 deletion onetl/connection/db_connection/kafka/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from __future__ import annotations

import json
import logging
from contextlib import closing
from typing import TYPE_CHECKING, Any, List, Optional
Expand Down Expand Up @@ -258,7 +259,7 @@
return self

@slot
def read_source_as_df(
def read_source_as_df( # noqa: WPS231
self,
source: str,
columns: list[str] | None = None,
Expand All @@ -276,7 +277,30 @@
result_options = {f"kafka.{key}": value for key, value in self._get_connection_properties().items()}
result_options.update(options.dict(by_alias=True, exclude_none=True))
result_options["subscribe"] = source

if window and window.expression == "offset":
# the 'including' flag in window values are relevant for batch strategies which are not
# supported by Kafka, therefore we always get offsets including border values
starting_offsets = dict(window.start_from.value) if window.start_from.value else {}
ending_offsets = dict(window.stop_at.value) if window.stop_at.value else {}

# when the Kafka topic's number of partitions has increased during incremental processing,
# new partitions, which are present in ending_offsets but not in
# starting_offsets, are assigned a default offset (0 in this case).
for partition in ending_offsets:
if partition not in starting_offsets:
starting_offsets[partition] = 0

if starting_offsets:
result_options["startingOffsets"] = json.dumps({source: starting_offsets})
if ending_offsets:
result_options["endingOffsets"] = json.dumps({source: ending_offsets})

df = self.spark.read.format("kafka").options(**result_options).load()

if limit is not None:
df = df.limit(limit)

Check warning on line 302 in onetl/connection/db_connection/kafka/connection.py

View check run for this annotation

Codecov / codecov/patch

onetl/connection/db_connection/kafka/connection.py#L302

Added line #L302 was not covered by tests

log.info("|%s| Dataframe is successfully created.", self.__class__.__name__)
return df

Expand Down Expand Up @@ -471,6 +495,9 @@
self,
source: str,
window: Window,
hint: Any | None = None,
where: Any | None = None,
options: KafkaReadOptions | dict | None = None,
) -> tuple[dict[int, int], dict[int, int]]:
log.info("|%s| Getting min and max offset values for topic %r ...", self.__class__.__name__, source)

Expand Down
10 changes: 8 additions & 2 deletions onetl/connection/db_connection/kafka/dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
NotSupportDFSchema,
NotSupportHint,
NotSupportWhere,
SupportNameAny,
)

if TYPE_CHECKING:
Expand All @@ -41,11 +40,18 @@ class KafkaDialect( # noqa: WPS215
NotSupportDFSchema,
NotSupportHint,
NotSupportWhere,
SupportNameAny,
DBDialect,
):
SUPPORTED_HWM_COLUMNS = {"offset"}

def validate_name(self, value: str) -> str:
if "*" in value or "," in value:
raise ValueError(
f"source/target={value} is not supported by {self.connection.__class__.__name__}. "
f"Provide a singular topic.",
)
return value

def validate_hwm(
self,
hwm: HWM | None,
Expand Down
2 changes: 1 addition & 1 deletion onetl/db/db_reader/db_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ def _get_hwm_field(self, hwm: HWM) -> StructField:
log.info("|%s| Got Spark field: %s", self.__class__.__name__, result)
return result

def _calculate_window_and_limit(self) -> tuple[Window | None, int | None]:
def _calculate_window_and_limit(self) -> tuple[Window | None, int | None]: # noqa: WPS231
if not self.hwm:
# SnapshotStrategy - always select all the data from source
return None, None
Expand Down
3 changes: 3 additions & 0 deletions onetl/strategy/batch_hwm_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ def check_hwm_increased(self, next_value: Any) -> None:
@property
def next(self) -> Edge:
if self.current.is_set():
if not hasattr(self.current.value, "__add__"):
raise RuntimeError(f"HWM: {self.hwm!r} cannot be used with Batch strategies")

result = Edge(value=self.current.value + self.step)
else:
result = Edge(value=self.stop)
Expand Down
34 changes: 34 additions & 0 deletions onetl/strategy/incremental_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,40 @@ class IncrementalStrategy(OffsetMixin, HWMStrategy):
FROM public.mydata
WHERE business_dt > CAST('2021-01-09' AS DATE); -- from HWM-offset (EXCLUDING first row)

Incremental run with :ref:`db-reader` and :ref:`kafka` connection
(by ``offset`` in topic - :etl-entities:`KeyValueHWM <hwm/key_value/index.html>`):

.. code:: python

from onetl.connection import Kafka
from onetl.db import DBReader
from onetl.strategy import IncrementalStrategy
from onetl.hwm import AutoDetectHWM

from pyspark.sql import SparkSession

maven_packages = Kafka.get_packages()
spark = (
SparkSession.builder.appName("spark-app-name")
.config("spark.jars.packages", ",".join(maven_packages))
.getOrCreate()
)

kafka = Kafka(
addresses=["mybroker:9092", "anotherbroker:9092"],
cluster="my-cluster",
spark=spark,
)

reader = DBReader(
connection=kafka,
source="topic_name",
hwm=DBReader.AutoDetectHWM(name="some_hwm_name", expression="offset"),
)

with IncrementalStrategy():
df = reader.run()

Incremental run with :ref:`file-downloader` and ``hwm=FileListHWM(...)``:

.. code:: python
Expand Down
48 changes: 48 additions & 0 deletions tests/fixtures/processing/kafka.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import json
import os
from typing import TYPE_CHECKING

Expand All @@ -17,6 +18,17 @@
class KafkaProcessing(BaseProcessing):
column_names: list[str] = ["id_int", "text_string", "hwm_int", "float_value"]

def __enter__(self):
return self

def __exit__(self, _exc_type, _exc_value, _traceback):
return False

@property
def schema(self) -> str:
# Kafka does not support schemas
return ""

def get_consumer(self):
from confluent_kafka import Consumer

Expand Down Expand Up @@ -111,11 +123,47 @@ def get_expected_df(self, topic: str, num_messages: int = 1, timeout: float = DE
def insert_data(self, schema: str, table: str, values: list) -> None:
pass

def change_topic_partitions(self, topic: str, num_partitions: int, timeout: float = DEFAULT_TIMEOUT):
from confluent_kafka.admin import NewPartitions

admin_client = self.get_admin_client()

if not self.topic_exists(topic):
self.create_topic(topic, num_partitions)
else:
new_partitions = [NewPartitions(topic, num_partitions)]
# change the number of partitions
fs = admin_client.create_partitions(new_partitions, request_timeout=timeout)

for topic, f in fs.items():
try:
f.result()
except Exception as e:
raise Exception(f"Failed to update number of partitions for topic '{topic}': {e}") # noqa: WPS454

def create_topic(self, topic: str, num_partitions: int, timeout: float = DEFAULT_TIMEOUT):
from confluent_kafka.admin import KafkaException, NewTopic

admin_client = self.get_admin_client()
topic_config = NewTopic(topic, num_partitions=num_partitions, replication_factor=1)
fs = admin_client.create_topics([topic_config], request_timeout=timeout)

for topic, f in fs.items():
try:
f.result()
except Exception as e:
raise KafkaException(f"Error creating topic '{topic}': {e}")

def delete_topic(self, topics: list[str], timeout: float = DEFAULT_TIMEOUT):
admin = self.get_admin_client()
# https://github.com/confluentinc/confluent-kafka-python/issues/813
admin.delete_topics(topics, request_timeout=timeout)

def insert_pandas_df_into_topic(self, df: pandas.DataFrame, topic: str):
for _, row in df.iterrows():
message = json.dumps(row.to_dict())
self.send_message(topic, message.encode("utf-8"))

def topic_exists(self, topic: str, timeout: float = DEFAULT_TIMEOUT) -> bool:
admin = self.get_admin_client()
topic_metadata = admin.list_topics(timeout=timeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def test_strategy_kafka_with_batch_strategy_error(strategy, spark):

processing = KafkaProcessing()

with strategy(step=10):
with strategy(step=10) as batches:
reader = DBReader(
connection=Kafka(
addresses=[f"{processing.host}:{processing.port}"],
Expand All @@ -31,5 +31,10 @@ def test_strategy_kafka_with_batch_strategy_error(strategy, spark):
table="topic",
hwm=DBReader.AutoDetectHWM(name=secrets.token_hex(5), expression="offset"),
)
with pytest.raises(RuntimeError):
reader.run()
# raises as at current version there is no way to distribute step size between kafka partitions
with pytest.raises(
RuntimeError,
match=r"HWM: .* cannot be used with Batch strategies",
):
for _ in batches:
reader.run()
Loading
Loading