From a4484dfb0c2006f87f31774ba408839453b83885 Mon Sep 17 00:00:00 2001 From: mvliksako1 Date: Wed, 12 Jul 2023 20:31:40 +0300 Subject: [PATCH 1/6] [DOP-2349] - Add ReadOptions, WriteOptions for Kafka connection --- .../db_connection/kafka/connection.py | 122 +++++++++++++++++- .../test_kafka_unit.py | 62 +++++++++ 2 files changed, 183 insertions(+), 1 deletion(-) diff --git a/onetl/connection/db_connection/kafka/connection.py b/onetl/connection/db_connection/kafka/connection.py index 1d52ef8a0..3a87e2759 100644 --- a/onetl/connection/db_connection/kafka/connection.py +++ b/onetl/connection/db_connection/kafka/connection.py @@ -26,7 +26,7 @@ from onetl.connection.db_connection.db_connection import DBConnection from onetl.hwm import Statement -from onetl.impl import LocalPath, path_repr +from onetl.impl import GenericOptions, LocalPath, path_repr if TYPE_CHECKING: from pyspark.sql import DataFrame @@ -35,6 +35,56 @@ log = logging.getLogger(__name__) +PROHIBITED_OPTIONS = frozenset( + ( + "assign", + "subscribe", + "subscribePattern", + "startingOffsets", + "startingOffsetsByTimestamp", + "startingTimestamp", + "endingOffsets", + "endingOffsetsByTimestamp", + "endingOffsets", + "startingOffsetsByTimestampStrategy", + "kafka.bootstrap.servers", + "kafka.group.id", + "topic", + ), +) + +KNOWN_READ_OPTIONS = frozenset( + ( + "assign", + "subscribe", + "subscribePattern", + "kafka.bootstrap.servers", + "startingTimestamp", + "startingOffsetsByTimestamp", + "startingOffsets", + "endingTimestamp", + "endingOffsetsByTimestamp", + "endingOffsets", + "failOnDataLoss", + "kafkaConsumer.pollTimeoutMs", + "fetchOffset.numRetries", + "fetchOffset.retryIntervalMs", + "maxOffsetsPerTrigger", + "minOffsetsPerTrigger", + "maxTriggerDelay", + "minPartitions", + "groupIdPrefix", + "kafka.group.id", + "includeHeaders", + "startingOffsetsByTimestampStrategy", + ), +) + +KNOWN_WRITE_OPTIONS = frozenset( + ("includeHeaders",), +) + + class Kafka(DBConnection): """ This connector is designed to read and write from Kafka in batch mode. @@ -118,6 +168,76 @@ class Kafka(DBConnection): """ + class ReadOptions(GenericOptions): + """Reading options for Kafka connector. + + .. note :: + + You can pass any value + `supported by connector `_, + even if it is not mentioned in this documentation. + + The set of supported options depends on connector version. + + .. warning:: + + Options ``kafka.*``, ``assign``, ``subscribe``, ``subscribePattern``, ``startingOffsets``, + ``startingOffsetsByTimestamp``, ``startingTimestamp``, ``endingOffsets``, ``endingOffsetsByTimestamp``, + ``endingOffsets``, ``startingOffsetsByTimestampStrategy``, ``topic`` are populated from connection + attributes, and cannot be set in ``KafkaReadOptions`` class. + + Examples + -------- + + Read options initialization + + .. code:: python + + Kafka.ReadOptions( + maxOffsetsPerTrigger=10000, + ) + """ + + class Config: + prohibited_options = PROHIBITED_OPTIONS + known_options = KNOWN_READ_OPTIONS + extra = "allow" + + class WriteOptions(GenericOptions): + """Writing options for Kafka connector. + + .. note :: + + You can pass any value + `supported by connector `_, + even if it is not mentioned in this documentation. + + The set of supported options depends on connector version. + + .. warning:: + + Options ``kafka.*``, ``assign``, ``subscribe``, ``subscribePattern``, ``startingOffsets``, + ``startingOffsetsByTimestamp``, ``startingTimestamp``, ``endingOffsets``, ``endingOffsetsByTimestamp``, + ``endingOffsets``, ``startingOffsetsByTimestampStrategy``, ``topic`` are populated from connection + attributes, and cannot be set in ``KafkaReadOptions`` class. + + Examples + -------- + + Write options initialization + + .. code:: python + + options = Kafka.WriteOptions( + includeHeaders=False, + ) + """ + + class Config: + prohibited_options = PROHIBITED_OPTIONS + known_options = KNOWN_WRITE_OPTIONS + extra = "allow" + def read_source_as_df( # type: ignore self, source: str, diff --git a/tests/tests_unit/tests_db_connection_unit/test_kafka_unit.py b/tests/tests_unit/tests_db_connection_unit/test_kafka_unit.py index ec6e7244b..9ebbf9a4f 100644 --- a/tests/tests_unit/tests_db_connection_unit/test_kafka_unit.py +++ b/tests/tests_unit/tests_db_connection_unit/test_kafka_unit.py @@ -3,6 +3,7 @@ from pathlib import Path import pytest +from pydantic import ValidationError from onetl.connection import Kafka @@ -28,6 +29,67 @@ def test_kafka_jars(spark_version, scala_version_input, scala_version_real): ) == [f"org.apache.spark:spark-sql-kafka-0-10_{scala_version_real}:{spark_version}"] +@pytest.mark.parametrize( + "arg, value", + [ + ("assign", "assign_value"), + ("subscribe", "subscribe_value"), + ("subscribePattern", "subscribePattern_value"), + ("startingOffsets", "startingOffsets_value"), + ("startingOffsetsByTimestamp", "startingOffsetsByTimestamp_value"), + ("startingTimestamp", "startingTimestamp_value"), + ("endingOffsets", "endingOffsets_value"), + ("endingOffsetsByTimestamp", "endingOffsetsByTimestamp_value"), + ("startingOffsetsByTimestampStrategy", "startingOffsetsByTimestampStrategy_value"), + ("kafka.bootstrap.servers", "kafka.bootstrap.servers_value"), + ("kafka.group.id", "kafka.group.id_value"), + ("topic", "topic_value"), + ], +) +def test_kafka_prohibited_options_error(arg, value): + error_msg = rf"Options \['{arg}'\] are not allowed to use in a ReadOptions" + with pytest.raises(ValueError, match=error_msg): + Kafka.ReadOptions(**{arg: value}) + error_msg = rf"Options \['{arg}'\] are not allowed to use in a WriteOptions" + with pytest.raises(ValueError, match=error_msg): + Kafka.WriteOptions(**{arg: value}) + + +@pytest.mark.parametrize( + "arg, value", + [ + ("failOnDataLoss", "false"), + ("kafkaConsumer.pollTimeoutMs", "30000"), + ("fetchOffset.numRetries", "3"), + ("fetchOffset.retryIntervalMs", "1000"), + ("maxOffsetsPerTrigger", "1000"), + ("minOffsetsPerTrigger", "500"), + ("maxTriggerDelay", "2000"), + ("minPartitions", "2"), + ("groupIdPrefix", "testPrefix"), + ("includeHeaders", "true"), + ], +) +def test_kafka_allowed_read_options_no_error(arg, value): + try: + Kafka.ReadOptions(**{arg: value}) + except ValidationError: + pytest.fail("ValidationError for ReadOptions raised unexpectedly!") + + +@pytest.mark.parametrize( + "arg, value", + [ + ("includeHeaders", "true"), + ], +) +def test_kafka_allowed_write_options_no_error(arg, value): + try: + Kafka.WriteOptions(**{arg: value}) + except ValidationError: + pytest.fail("ValidationError for Write options raised unexpectedly!") + + def test_kafka_auth(spark_mock): # Act conn = Kafka( From 89c35398406b37ce7ad31aeee1bf54cfebd26b7e Mon Sep 17 00:00:00 2001 From: Maxim Liksakov <67663774+maxim-lixakov@users.noreply.github.com> Date: Thu, 13 Jul 2023 11:16:30 +0300 Subject: [PATCH 2/6] Update onetl/connection/db_connection/kafka/connection.py Co-authored-by: Maxim Martynov --- onetl/connection/db_connection/kafka/connection.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/onetl/connection/db_connection/kafka/connection.py b/onetl/connection/db_connection/kafka/connection.py index 3a87e2759..1d701d82c 100644 --- a/onetl/connection/db_connection/kafka/connection.py +++ b/onetl/connection/db_connection/kafka/connection.py @@ -47,8 +47,7 @@ "endingOffsetsByTimestamp", "endingOffsets", "startingOffsetsByTimestampStrategy", - "kafka.bootstrap.servers", - "kafka.group.id", + "kafka.*", "topic", ), ) From 1b25879e55be3631d422eedf7eabc758aa2f948b Mon Sep 17 00:00:00 2001 From: Maxim Liksakov <67663774+maxim-lixakov@users.noreply.github.com> Date: Thu, 13 Jul 2023 11:16:41 +0300 Subject: [PATCH 3/6] Update onetl/connection/db_connection/kafka/connection.py Co-authored-by: Maxim Martynov --- onetl/connection/db_connection/kafka/connection.py | 1 - 1 file changed, 1 deletion(-) diff --git a/onetl/connection/db_connection/kafka/connection.py b/onetl/connection/db_connection/kafka/connection.py index 1d701d82c..ff3f17ca3 100644 --- a/onetl/connection/db_connection/kafka/connection.py +++ b/onetl/connection/db_connection/kafka/connection.py @@ -57,7 +57,6 @@ "assign", "subscribe", "subscribePattern", - "kafka.bootstrap.servers", "startingTimestamp", "startingOffsetsByTimestamp", "startingOffsets", From 7af828070ba09f31f0a868b8ccf97744410a3a85 Mon Sep 17 00:00:00 2001 From: mvliksako1 Date: Thu, 13 Jul 2023 12:34:10 +0300 Subject: [PATCH 4/6] [DOP-2349] - options moved to seperate file --- .../db_connection/kafka/connection.py | 125 +----------------- .../connection/db_connection/kafka/options.py | 110 +++++++++++++++ .../test_kafka_unit.py | 4 +- 3 files changed, 119 insertions(+), 120 deletions(-) create mode 100644 onetl/connection/db_connection/kafka/options.py diff --git a/onetl/connection/db_connection/kafka/connection.py b/onetl/connection/db_connection/kafka/connection.py index ff3f17ca3..f3ee6f6db 100644 --- a/onetl/connection/db_connection/kafka/connection.py +++ b/onetl/connection/db_connection/kafka/connection.py @@ -25,8 +25,12 @@ from pydantic import SecretStr, root_validator, validator from onetl.connection.db_connection.db_connection import DBConnection +from onetl.connection.db_connection.kafka.options import ( + KafkaReadOptions, + KafkaWriteOptions, +) from onetl.hwm import Statement -from onetl.impl import GenericOptions, LocalPath, path_repr +from onetl.impl import LocalPath, path_repr if TYPE_CHECKING: from pyspark.sql import DataFrame @@ -35,54 +39,6 @@ log = logging.getLogger(__name__) -PROHIBITED_OPTIONS = frozenset( - ( - "assign", - "subscribe", - "subscribePattern", - "startingOffsets", - "startingOffsetsByTimestamp", - "startingTimestamp", - "endingOffsets", - "endingOffsetsByTimestamp", - "endingOffsets", - "startingOffsetsByTimestampStrategy", - "kafka.*", - "topic", - ), -) - -KNOWN_READ_OPTIONS = frozenset( - ( - "assign", - "subscribe", - "subscribePattern", - "startingTimestamp", - "startingOffsetsByTimestamp", - "startingOffsets", - "endingTimestamp", - "endingOffsetsByTimestamp", - "endingOffsets", - "failOnDataLoss", - "kafkaConsumer.pollTimeoutMs", - "fetchOffset.numRetries", - "fetchOffset.retryIntervalMs", - "maxOffsetsPerTrigger", - "minOffsetsPerTrigger", - "maxTriggerDelay", - "minPartitions", - "groupIdPrefix", - "kafka.group.id", - "includeHeaders", - "startingOffsetsByTimestampStrategy", - ), -) - -KNOWN_WRITE_OPTIONS = frozenset( - ("includeHeaders",), -) - - class Kafka(DBConnection): """ This connector is designed to read and write from Kafka in batch mode. @@ -166,75 +122,8 @@ class Kafka(DBConnection): """ - class ReadOptions(GenericOptions): - """Reading options for Kafka connector. - - .. note :: - - You can pass any value - `supported by connector `_, - even if it is not mentioned in this documentation. - - The set of supported options depends on connector version. - - .. warning:: - - Options ``kafka.*``, ``assign``, ``subscribe``, ``subscribePattern``, ``startingOffsets``, - ``startingOffsetsByTimestamp``, ``startingTimestamp``, ``endingOffsets``, ``endingOffsetsByTimestamp``, - ``endingOffsets``, ``startingOffsetsByTimestampStrategy``, ``topic`` are populated from connection - attributes, and cannot be set in ``KafkaReadOptions`` class. - - Examples - -------- - - Read options initialization - - .. code:: python - - Kafka.ReadOptions( - maxOffsetsPerTrigger=10000, - ) - """ - - class Config: - prohibited_options = PROHIBITED_OPTIONS - known_options = KNOWN_READ_OPTIONS - extra = "allow" - - class WriteOptions(GenericOptions): - """Writing options for Kafka connector. - - .. note :: - - You can pass any value - `supported by connector `_, - even if it is not mentioned in this documentation. - - The set of supported options depends on connector version. - - .. warning:: - - Options ``kafka.*``, ``assign``, ``subscribe``, ``subscribePattern``, ``startingOffsets``, - ``startingOffsetsByTimestamp``, ``startingTimestamp``, ``endingOffsets``, ``endingOffsetsByTimestamp``, - ``endingOffsets``, ``startingOffsetsByTimestampStrategy``, ``topic`` are populated from connection - attributes, and cannot be set in ``KafkaReadOptions`` class. - - Examples - -------- - - Write options initialization - - .. code:: python - - options = Kafka.WriteOptions( - includeHeaders=False, - ) - """ - - class Config: - prohibited_options = PROHIBITED_OPTIONS - known_options = KNOWN_WRITE_OPTIONS - extra = "allow" + ReadOptions = KafkaReadOptions + WriteOptions = KafkaWriteOptions def read_source_as_df( # type: ignore self, diff --git a/onetl/connection/db_connection/kafka/options.py b/onetl/connection/db_connection/kafka/options.py new file mode 100644 index 000000000..7b6d36bd4 --- /dev/null +++ b/onetl/connection/db_connection/kafka/options.py @@ -0,0 +1,110 @@ +from onetl.impl import GenericOptions + +PROHIBITED_OPTIONS = frozenset( + ( + "assign", + "subscribe", + "subscribePattern", + "startingOffsets", + "startingOffsetsByTimestamp", + "startingTimestamp", + "endingOffsets", + "endingOffsetsByTimestamp", + "endingOffsets", + "startingOffsetsByTimestampStrategy", + "kafka.*", + "topic", + ), +) + +KNOWN_READ_OPTIONS = frozenset( + ( + "maxTriggerDelay", + "minOffsetsPerTrigger", + "maxOffsetsPerTrigger", + "kafkaConsumer.pollTimeoutMs", + "fetchOffset.numRetries", + "minPartitions", + "failOnDataLoss", + "includeHeaders", + "fetchOffset.retryIntervalMs", + "groupIdPrefix", + "endingTimestamp", + ), +) + +KNOWN_WRITE_OPTIONS = frozenset( + ("includeHeaders",), +) + + +class KafkaReadOptions(GenericOptions): + """Reading options for Kafka connector. + + .. note :: + + You can pass any value + `supported by connector `_, + even if it is not mentioned in this documentation. + + The set of supported options depends on connector version. + + .. warning:: + + Options ``kafka.*``, ``assign``, ``subscribe``, ``subscribePattern``, ``startingOffsets``, + ``startingOffsetsByTimestamp``, ``startingTimestamp``, ``endingOffsets``, ``endingOffsetsByTimestamp``, + ``endingOffsets``, ``startingOffsetsByTimestampStrategy``, ``topic`` are populated from connection + attributes, and cannot be set in ``KafkaReadOptions`` class. + + Examples + -------- + + Read options initialization + + .. code:: python + + Kafka.ReadOptions( + maxOffsetsPerTrigger=10000, + ) + """ + + class Config: + prohibited_options = PROHIBITED_OPTIONS + known_options = KNOWN_READ_OPTIONS + extra = "allow" + + +class KafkaWriteOptions(GenericOptions): + """Writing options for Kafka connector. + + .. note :: + + You can pass any value + `supported by connector `_, + even if it is not mentioned in this documentation. + + The set of supported options depends on connector version. + + .. warning:: + + Options ``kafka.*``, ``assign``, ``subscribe``, ``subscribePattern``, ``startingOffsets``, + ``startingOffsetsByTimestamp``, ``startingTimestamp``, ``endingOffsets``, ``endingOffsetsByTimestamp``, + ``endingOffsets``, ``startingOffsetsByTimestampStrategy``, ``topic`` are populated from connection + attributes, and cannot be set in ``KafkaWriteOptions`` class. + + Examples + -------- + + Write options initialization + + .. code:: python + + options = Kafka.WriteOptions( + includeHeaders=False, + ) + """ + + class Config: + prohibited_options = PROHIBITED_OPTIONS + known_options = KNOWN_WRITE_OPTIONS + extra = "allow" diff --git a/tests/tests_unit/tests_db_connection_unit/test_kafka_unit.py b/tests/tests_unit/tests_db_connection_unit/test_kafka_unit.py index 9ebbf9a4f..873015d7c 100644 --- a/tests/tests_unit/tests_db_connection_unit/test_kafka_unit.py +++ b/tests/tests_unit/tests_db_connection_unit/test_kafka_unit.py @@ -47,10 +47,10 @@ def test_kafka_jars(spark_version, scala_version_input, scala_version_real): ], ) def test_kafka_prohibited_options_error(arg, value): - error_msg = rf"Options \['{arg}'\] are not allowed to use in a ReadOptions" + error_msg = rf"Options \['{arg}'\] are not allowed to use in a KafkaReadOptions" with pytest.raises(ValueError, match=error_msg): Kafka.ReadOptions(**{arg: value}) - error_msg = rf"Options \['{arg}'\] are not allowed to use in a WriteOptions" + error_msg = rf"Options \['{arg}'\] are not allowed to use in a KafkaWriteOptions" with pytest.raises(ValueError, match=error_msg): Kafka.WriteOptions(**{arg: value}) From 46d74975e19f61c8704fe54883152b1c1fbcd46a Mon Sep 17 00:00:00 2001 From: mvliksako1 Date: Thu, 13 Jul 2023 13:57:42 +0300 Subject: [PATCH 5/6] [DOP-2349] - updated docstrings in kafka.options --- .../connection/db_connection/kafka/options.py | 49 ++++++++++--------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/onetl/connection/db_connection/kafka/options.py b/onetl/connection/db_connection/kafka/options.py index 7b6d36bd4..69eb87ed1 100644 --- a/onetl/connection/db_connection/kafka/options.py +++ b/onetl/connection/db_connection/kafka/options.py @@ -3,33 +3,34 @@ PROHIBITED_OPTIONS = frozenset( ( "assign", - "subscribe", - "subscribePattern", - "startingOffsets", - "startingOffsetsByTimestamp", - "startingTimestamp", "endingOffsets", - "endingOffsetsByTimestamp", "endingOffsets", - "startingOffsetsByTimestampStrategy", + "endingOffsetsByTimestamp", "kafka.*", + "startingOffsets", + "startingOffsets", + "startingOffsetsByTimestamp", + "startingOffsetsByTimestampStrategy", + "startingTimestamp", + "subscribe", + "subscribePattern", "topic", ), ) KNOWN_READ_OPTIONS = frozenset( ( - "maxTriggerDelay", - "minOffsetsPerTrigger", - "maxOffsetsPerTrigger", - "kafkaConsumer.pollTimeoutMs", - "fetchOffset.numRetries", - "minPartitions", + "endingTimestamp", "failOnDataLoss", - "includeHeaders", + "fetchOffset.numRetries", "fetchOffset.retryIntervalMs", "groupIdPrefix", - "endingTimestamp", + "includeHeaders", + "kafkaConsumer.pollTimeoutMs", + "maxOffsetsPerTrigger", + "maxTriggerDelay", + "minOffsetsPerTrigger", + "minPartitions", ), ) @@ -51,10 +52,11 @@ class KafkaReadOptions(GenericOptions): .. warning:: - Options ``kafka.*``, ``assign``, ``subscribe``, ``subscribePattern``, ``startingOffsets``, - ``startingOffsetsByTimestamp``, ``startingTimestamp``, ``endingOffsets``, ``endingOffsetsByTimestamp``, - ``endingOffsets``, ``startingOffsetsByTimestampStrategy``, ``topic`` are populated from connection - attributes, and cannot be set in ``KafkaReadOptions`` class. + Options ``["assign", "endingOffsets", "endingOffsets", "endingOffsetsByTimestamp", "kafka.*", + "startingOffsets", "startingOffsets", "startingOffsetsByTimestamp", "startingOffsetsByTimestampStrategy", + "startingTimestamp", "subscribe", "subscribePattern", "topic"]`` are populated from connection + attributes, and cannot be set in ``KafkaReadOptions`` class and be overridden by the user to avoid + duplicating them in the documentation. Examples -------- @@ -87,10 +89,11 @@ class KafkaWriteOptions(GenericOptions): .. warning:: - Options ``kafka.*``, ``assign``, ``subscribe``, ``subscribePattern``, ``startingOffsets``, - ``startingOffsetsByTimestamp``, ``startingTimestamp``, ``endingOffsets``, ``endingOffsetsByTimestamp``, - ``endingOffsets``, ``startingOffsetsByTimestampStrategy``, ``topic`` are populated from connection - attributes, and cannot be set in ``KafkaWriteOptions`` class. + Options ``["assign", "endingOffsets", "endingOffsets", "endingOffsetsByTimestamp", "kafka.*", + "startingOffsets", "startingOffsets", "startingOffsetsByTimestamp", "startingOffsetsByTimestampStrategy", + "startingTimestamp", "subscribe", "subscribePattern", "topic"]`` are populated from connection + attributes, and cannot be set in ``KafkaReadOptions`` class and be overridden by the user to avoid + duplicating them in the documentation. Examples -------- From af89203350f3ef0542e8712081170a8dbfcbafda Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Thu, 13 Jul 2023 14:29:07 +0300 Subject: [PATCH 6/6] [DOP-2349] - updated docstrings in kafka.options --- .../connection/db_connection/kafka/options.py | 42 +++++++++++++------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/onetl/connection/db_connection/kafka/options.py b/onetl/connection/db_connection/kafka/options.py index 69eb87ed1..f6b493f96 100644 --- a/onetl/connection/db_connection/kafka/options.py +++ b/onetl/connection/db_connection/kafka/options.py @@ -4,11 +4,9 @@ ( "assign", "endingOffsets", - "endingOffsets", "endingOffsetsByTimestamp", "kafka.*", "startingOffsets", - "startingOffsets", "startingOffsetsByTimestamp", "startingOffsetsByTimestampStrategy", "startingTimestamp", @@ -52,11 +50,21 @@ class KafkaReadOptions(GenericOptions): .. warning:: - Options ``["assign", "endingOffsets", "endingOffsets", "endingOffsetsByTimestamp", "kafka.*", - "startingOffsets", "startingOffsets", "startingOffsetsByTimestamp", "startingOffsetsByTimestampStrategy", - "startingTimestamp", "subscribe", "subscribePattern", "topic"]`` are populated from connection - attributes, and cannot be set in ``KafkaReadOptions`` class and be overridden by the user to avoid - duplicating them in the documentation. + Options: + * ``assign`` + * ``endingOffsets`` + * ``endingOffsetsByTimestamp`` + * ``kafka.*`` + * ``startingOffsets`` + * ``startingOffsetsByTimestamp`` + * ``startingOffsetsByTimestampStrategy`` + * ``startingTimestamp`` + * ``subscribe`` + * ``subscribePattern`` + * ``topic`` + + populated from connection attributes, and cannot be set in ``KafkaReadOptions`` class and be overridden + by the user to avoid issues. Examples -------- @@ -89,11 +97,21 @@ class KafkaWriteOptions(GenericOptions): .. warning:: - Options ``["assign", "endingOffsets", "endingOffsets", "endingOffsetsByTimestamp", "kafka.*", - "startingOffsets", "startingOffsets", "startingOffsetsByTimestamp", "startingOffsetsByTimestampStrategy", - "startingTimestamp", "subscribe", "subscribePattern", "topic"]`` are populated from connection - attributes, and cannot be set in ``KafkaReadOptions`` class and be overridden by the user to avoid - duplicating them in the documentation. + Options: + * ``assign`` + * ``endingOffsets`` + * ``endingOffsetsByTimestamp`` + * ``kafka.*`` + * ``startingOffsets`` + * ``startingOffsetsByTimestamp`` + * ``startingOffsetsByTimestampStrategy`` + * ``startingTimestamp`` + * ``subscribe`` + * ``subscribePattern`` + * ``topic`` + + populated from connection attributes, and cannot be set in ``KafkaWriteOptions`` class and be overridden + by the user to avoid issues. Examples --------