Skip to content

Commit

Permalink
[DOP-7290] Implement KafkaAuth
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitry-pedchenko committed Jul 18, 2023
1 parent 5589af6 commit ba8e1fd
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 20 deletions.
23 changes: 8 additions & 15 deletions onetl/connection/db_connection/kafka/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from onetl.connection.db_connection.db_connection import DBConnection
from onetl.connection.db_connection.kafka.dialect import KafkaDialect
from onetl.connection.db_connection.kafka.i_kafka_auth import IKafkaAuth
from onetl.connection.db_connection.kafka.ikafka_auth import IKafkaAuth
from onetl.connection.db_connection.kafka.kafka_basic_auth import KafkaBasicAuth
from onetl.connection.db_connection.kafka.kafka_kerberos_auth import KafkaKerberosAuth
from onetl.connection.db_connection.kafka.options import (
Expand Down Expand Up @@ -57,17 +57,17 @@ class Kafka(DBConnection):
Parameters
----------
auth : IKafkaAuth, default: ``None``
An attribute that contains a class that generates a Kafka connection configuration.
It depends on the type of connection to Kafka.
addresses : list[str]
A list of broker addresses, for example ``[192.168.1.10:9092, 192.168.1.11:9092]``.
The list cannot be empty.
cluster : Cluster
Cluster name. Used for HWM and lineage. A cluster field cannot be empty.
auth : IKafkaAuth, default: ``None``
An attribute that contains a class that generates a Kafka connection configuration.
It depends on the type of connection to Kafka.
.. warning::
When creating a connector, when specifying `user` parameter, either `password` or `keytab` can be specified. Or
Expand Down Expand Up @@ -118,16 +118,12 @@ class Kafka(DBConnection):

BasicAuth = KafkaBasicAuth
KerberosAuth = KafkaKerberosAuth

auth: Optional[IKafkaAuth] = None

addresses: List[str]
cluster: Cluster

ReadOptions = KafkaReadOptions
WriteOptions = KafkaWriteOptions

Dialect = KafkaDialect
addresses: List[str]
cluster: Cluster
auth: Optional[IKafkaAuth] = None

def read_source_as_df( # type: ignore
self,
Expand Down Expand Up @@ -161,9 +157,6 @@ def instance_url(self):
def check(self):
return self

def save_df(self, df: DataFrame, table: str) -> None:
...

def get_min_max_bounds( # type: ignore
self,
table: str,
Expand Down
15 changes: 15 additions & 0 deletions onetl/connection/db_connection/kafka/dialect.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
# Copyright 2023 MTS (Mobile Telesystems)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from __future__ import annotations

import logging
Expand Down
5 changes: 1 addition & 4 deletions onetl/connection/db_connection/kafka/kafka_basic_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from pydantic import Field, SecretStr

from onetl.connection.db_connection.kafka.i_kafka_auth import IKafkaAuth
from onetl.connection.db_connection.kafka.ikafka_auth import IKafkaAuth
from onetl.impl import GenericOptions

if TYPE_CHECKING:
Expand All @@ -35,9 +35,6 @@ class KafkaBasicAuth(IKafkaAuth, GenericOptions):
user: str = Field(alias="username")
password: SecretStr

class Config:
allow_extra = True

def get_jaas_conf(self) -> str:
return dedent(
f"""\
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from pydantic import Field, validator

from onetl._internal import to_camel # noqa: WPS436
from onetl.connection.db_connection.kafka.i_kafka_auth import IKafkaAuth
from onetl.connection.db_connection.kafka.ikafka_auth import IKafkaAuth
from onetl.impl import GenericOptions, LocalPath, path_repr

if TYPE_CHECKING:
Expand Down
15 changes: 15 additions & 0 deletions onetl/connection/db_connection/kafka/options.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
# Copyright 2023 MTS (Mobile Telesystems)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from onetl.impl import GenericOptions

PROHIBITED_OPTIONS = frozenset(
Expand Down

0 comments on commit ba8e1fd

Please sign in to comment.