Skip to content

Commit

Permalink
Merge pull request #68 from MobileTeleSystems/feature/DOP-2349
Browse files Browse the repository at this point in the history
[DOP-2349] - Add ReadOptions, WriteOptions for Kafka connection
  • Loading branch information
maxim-lixakov authored Jul 13, 2023
2 parents f4affb6 + af89203 commit 8e060bc
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 0 deletions.
7 changes: 7 additions & 0 deletions onetl/connection/db_connection/kafka/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
from pydantic import SecretStr, root_validator, validator

from onetl.connection.db_connection.db_connection import DBConnection
from onetl.connection.db_connection.kafka.options import (
KafkaReadOptions,
KafkaWriteOptions,
)
from onetl.hwm import Statement
from onetl.impl import LocalPath, path_repr

Expand Down Expand Up @@ -118,6 +122,9 @@ class Kafka(DBConnection):
"""

ReadOptions = KafkaReadOptions
WriteOptions = KafkaWriteOptions

def read_source_as_df( # type: ignore
self,
source: str,
Expand Down
131 changes: 131 additions & 0 deletions onetl/connection/db_connection/kafka/options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
from onetl.impl import GenericOptions

PROHIBITED_OPTIONS = frozenset(
(
"assign",
"endingOffsets",
"endingOffsetsByTimestamp",
"kafka.*",
"startingOffsets",
"startingOffsetsByTimestamp",
"startingOffsetsByTimestampStrategy",
"startingTimestamp",
"subscribe",
"subscribePattern",
"topic",
),
)

KNOWN_READ_OPTIONS = frozenset(
(
"endingTimestamp",
"failOnDataLoss",
"fetchOffset.numRetries",
"fetchOffset.retryIntervalMs",
"groupIdPrefix",
"includeHeaders",
"kafkaConsumer.pollTimeoutMs",
"maxOffsetsPerTrigger",
"maxTriggerDelay",
"minOffsetsPerTrigger",
"minPartitions",
),
)

KNOWN_WRITE_OPTIONS = frozenset(
("includeHeaders",),
)


class KafkaReadOptions(GenericOptions):
"""Reading options for Kafka connector.
.. note ::
You can pass any value
`supported by connector <https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html>`_,
even if it is not mentioned in this documentation.
The set of supported options depends on connector version.
.. warning::
Options:
* ``assign``
* ``endingOffsets``
* ``endingOffsetsByTimestamp``
* ``kafka.*``
* ``startingOffsets``
* ``startingOffsetsByTimestamp``
* ``startingOffsetsByTimestampStrategy``
* ``startingTimestamp``
* ``subscribe``
* ``subscribePattern``
* ``topic``
populated from connection attributes, and cannot be set in ``KafkaReadOptions`` class and be overridden
by the user to avoid issues.
Examples
--------
Read options initialization
.. code:: python
Kafka.ReadOptions(
maxOffsetsPerTrigger=10000,
)
"""

class Config:
prohibited_options = PROHIBITED_OPTIONS
known_options = KNOWN_READ_OPTIONS
extra = "allow"


class KafkaWriteOptions(GenericOptions):
"""Writing options for Kafka connector.
.. note ::
You can pass any value
`supported by connector <https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html>`_,
even if it is not mentioned in this documentation.
The set of supported options depends on connector version.
.. warning::
Options:
* ``assign``
* ``endingOffsets``
* ``endingOffsetsByTimestamp``
* ``kafka.*``
* ``startingOffsets``
* ``startingOffsetsByTimestamp``
* ``startingOffsetsByTimestampStrategy``
* ``startingTimestamp``
* ``subscribe``
* ``subscribePattern``
* ``topic``
populated from connection attributes, and cannot be set in ``KafkaWriteOptions`` class and be overridden
by the user to avoid issues.
Examples
--------
Write options initialization
.. code:: python
options = Kafka.WriteOptions(
includeHeaders=False,
)
"""

class Config:
prohibited_options = PROHIBITED_OPTIONS
known_options = KNOWN_WRITE_OPTIONS
extra = "allow"
62 changes: 62 additions & 0 deletions tests/tests_unit/tests_db_connection_unit/test_kafka_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pathlib import Path

import pytest
from pydantic import ValidationError

from onetl.connection import Kafka

Expand All @@ -28,6 +29,67 @@ def test_kafka_jars(spark_version, scala_version_input, scala_version_real):
) == [f"org.apache.spark:spark-sql-kafka-0-10_{scala_version_real}:{spark_version}"]


@pytest.mark.parametrize(
"arg, value",
[
("assign", "assign_value"),
("subscribe", "subscribe_value"),
("subscribePattern", "subscribePattern_value"),
("startingOffsets", "startingOffsets_value"),
("startingOffsetsByTimestamp", "startingOffsetsByTimestamp_value"),
("startingTimestamp", "startingTimestamp_value"),
("endingOffsets", "endingOffsets_value"),
("endingOffsetsByTimestamp", "endingOffsetsByTimestamp_value"),
("startingOffsetsByTimestampStrategy", "startingOffsetsByTimestampStrategy_value"),
("kafka.bootstrap.servers", "kafka.bootstrap.servers_value"),
("kafka.group.id", "kafka.group.id_value"),
("topic", "topic_value"),
],
)
def test_kafka_prohibited_options_error(arg, value):
error_msg = rf"Options \['{arg}'\] are not allowed to use in a KafkaReadOptions"
with pytest.raises(ValueError, match=error_msg):
Kafka.ReadOptions(**{arg: value})
error_msg = rf"Options \['{arg}'\] are not allowed to use in a KafkaWriteOptions"
with pytest.raises(ValueError, match=error_msg):
Kafka.WriteOptions(**{arg: value})


@pytest.mark.parametrize(
"arg, value",
[
("failOnDataLoss", "false"),
("kafkaConsumer.pollTimeoutMs", "30000"),
("fetchOffset.numRetries", "3"),
("fetchOffset.retryIntervalMs", "1000"),
("maxOffsetsPerTrigger", "1000"),
("minOffsetsPerTrigger", "500"),
("maxTriggerDelay", "2000"),
("minPartitions", "2"),
("groupIdPrefix", "testPrefix"),
("includeHeaders", "true"),
],
)
def test_kafka_allowed_read_options_no_error(arg, value):
try:
Kafka.ReadOptions(**{arg: value})
except ValidationError:
pytest.fail("ValidationError for ReadOptions raised unexpectedly!")


@pytest.mark.parametrize(
"arg, value",
[
("includeHeaders", "true"),
],
)
def test_kafka_allowed_write_options_no_error(arg, value):
try:
Kafka.WriteOptions(**{arg: value})
except ValidationError:
pytest.fail("ValidationError for Write options raised unexpectedly!")


def test_kafka_auth(spark_mock):
# Act
conn = Kafka(
Expand Down

0 comments on commit 8e060bc

Please sign in to comment.