From ba8e1fd4901a2dd4dbfaed011083b6356fe0a836 Mon Sep 17 00:00:00 2001 From: Pedchenko Dmitry Date: Tue, 18 Jul 2023 23:00:02 +0300 Subject: [PATCH] [DOP-7290] Implement KafkaAuth --- .../db_connection/kafka/connection.py | 23 +++++++------------ .../connection/db_connection/kafka/dialect.py | 15 ++++++++++++ .../kafka/{i_kafka_auth.py => ikafka_auth.py} | 0 .../db_connection/kafka/kafka_basic_auth.py | 5 +--- .../kafka/kafka_kerberos_auth.py | 2 +- .../connection/db_connection/kafka/options.py | 15 ++++++++++++ 6 files changed, 40 insertions(+), 20 deletions(-) rename onetl/connection/db_connection/kafka/{i_kafka_auth.py => ikafka_auth.py} (100%) diff --git a/onetl/connection/db_connection/kafka/connection.py b/onetl/connection/db_connection/kafka/connection.py index 2fd4621da..bb6c82345 100644 --- a/onetl/connection/db_connection/kafka/connection.py +++ b/onetl/connection/db_connection/kafka/connection.py @@ -22,7 +22,7 @@ from onetl.connection.db_connection.db_connection import DBConnection from onetl.connection.db_connection.kafka.dialect import KafkaDialect -from onetl.connection.db_connection.kafka.i_kafka_auth import IKafkaAuth +from onetl.connection.db_connection.kafka.ikafka_auth import IKafkaAuth from onetl.connection.db_connection.kafka.kafka_basic_auth import KafkaBasicAuth from onetl.connection.db_connection.kafka.kafka_kerberos_auth import KafkaKerberosAuth from onetl.connection.db_connection.kafka.options import ( @@ -57,10 +57,6 @@ class Kafka(DBConnection): Parameters ---------- - auth : IKafkaAuth, default: ``None`` - An attribute that contains a class that generates a Kafka connection configuration. - It depends on the type of connection to Kafka. - addresses : list[str] A list of broker addresses, for example ``[192.168.1.10:9092, 192.168.1.11:9092]``. The list cannot be empty. @@ -68,6 +64,10 @@ class Kafka(DBConnection): cluster : Cluster Cluster name. Used for HWM and lineage. A cluster field cannot be empty. + auth : IKafkaAuth, default: ``None`` + An attribute that contains a class that generates a Kafka connection configuration. + It depends on the type of connection to Kafka. + .. warning:: When creating a connector, when specifying `user` parameter, either `password` or `keytab` can be specified. Or @@ -118,16 +118,12 @@ class Kafka(DBConnection): BasicAuth = KafkaBasicAuth KerberosAuth = KafkaKerberosAuth - - auth: Optional[IKafkaAuth] = None - - addresses: List[str] - cluster: Cluster - ReadOptions = KafkaReadOptions WriteOptions = KafkaWriteOptions - Dialect = KafkaDialect + addresses: List[str] + cluster: Cluster + auth: Optional[IKafkaAuth] = None def read_source_as_df( # type: ignore self, @@ -161,9 +157,6 @@ def instance_url(self): def check(self): return self - def save_df(self, df: DataFrame, table: str) -> None: - ... - def get_min_max_bounds( # type: ignore self, table: str, diff --git a/onetl/connection/db_connection/kafka/dialect.py b/onetl/connection/db_connection/kafka/dialect.py index 4464381e4..885f2ea8d 100644 --- a/onetl/connection/db_connection/kafka/dialect.py +++ b/onetl/connection/db_connection/kafka/dialect.py @@ -1,3 +1,18 @@ +# Copyright 2023 MTS (Mobile Telesystems) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + from __future__ import annotations import logging diff --git a/onetl/connection/db_connection/kafka/i_kafka_auth.py b/onetl/connection/db_connection/kafka/ikafka_auth.py similarity index 100% rename from onetl/connection/db_connection/kafka/i_kafka_auth.py rename to onetl/connection/db_connection/kafka/ikafka_auth.py diff --git a/onetl/connection/db_connection/kafka/kafka_basic_auth.py b/onetl/connection/db_connection/kafka/kafka_basic_auth.py index e66a606ee..38e778241 100644 --- a/onetl/connection/db_connection/kafka/kafka_basic_auth.py +++ b/onetl/connection/db_connection/kafka/kafka_basic_auth.py @@ -18,7 +18,7 @@ from pydantic import Field, SecretStr -from onetl.connection.db_connection.kafka.i_kafka_auth import IKafkaAuth +from onetl.connection.db_connection.kafka.ikafka_auth import IKafkaAuth from onetl.impl import GenericOptions if TYPE_CHECKING: @@ -35,9 +35,6 @@ class KafkaBasicAuth(IKafkaAuth, GenericOptions): user: str = Field(alias="username") password: SecretStr - class Config: - allow_extra = True - def get_jaas_conf(self) -> str: return dedent( f"""\ diff --git a/onetl/connection/db_connection/kafka/kafka_kerberos_auth.py b/onetl/connection/db_connection/kafka/kafka_kerberos_auth.py index 7552deb96..de9167089 100644 --- a/onetl/connection/db_connection/kafka/kafka_kerberos_auth.py +++ b/onetl/connection/db_connection/kafka/kafka_kerberos_auth.py @@ -21,7 +21,7 @@ from pydantic import Field, validator from onetl._internal import to_camel # noqa: WPS436 -from onetl.connection.db_connection.kafka.i_kafka_auth import IKafkaAuth +from onetl.connection.db_connection.kafka.ikafka_auth import IKafkaAuth from onetl.impl import GenericOptions, LocalPath, path_repr if TYPE_CHECKING: diff --git a/onetl/connection/db_connection/kafka/options.py b/onetl/connection/db_connection/kafka/options.py index f6b493f96..e7ed90411 100644 --- a/onetl/connection/db_connection/kafka/options.py +++ b/onetl/connection/db_connection/kafka/options.py @@ -1,3 +1,18 @@ +# Copyright 2023 MTS (Mobile Telesystems) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + from onetl.impl import GenericOptions PROHIBITED_OPTIONS = frozenset(