-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathkafka.py
104 lines (83 loc) · 3.19 KB
/
kafka.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import logging
from functools import lru_cache
from typing import Any
from confluent_kafka import KafkaError, KafkaException, Message, Producer
from event_schema.auth import UserLogin, UserLoginKey
from auth_backend import __version__
from auth_backend.kafka.kafkameta import KafkaMeta
from auth_backend.settings import get_settings
log = logging.getLogger(__name__)
class Kafka(KafkaMeta):
"""
Класс для работы с Kafka
"""
__dsn = get_settings().KAFKA_DSN
__devel: bool = True if __version__ == "dev" else False
__conf: dict[str, str] = {}
__timeout: int = get_settings().KAFKA_TIMEOUT
__login: str | None = get_settings().KAFKA_LOGIN
__password: str | None = get_settings().KAFKA_PASSWORD
_producer: Producer
def __configurate(self) -> None:
if self.__devel:
self.__conf = {"bootstrap.servers": self.__dsn}
else:
self.__conf = {
'bootstrap.servers': self.__dsn,
'sasl.mechanisms': "PLAIN",
'security.protocol': "SASL_PLAINTEXT",
'sasl.username': self.__login,
'sasl.password': self.__password,
}
def __init__(self) -> None:
self.__configurate()
self._producer = Producer(self.__conf)
log.info("Kafka init done")
def delivery_callback(self, err: KafkaError, msg: Message) -> None:
"""
Args:
err: произошедшая ошибка при доставке
msg: доставленное сообщение
Returns:
Ничего
"""
if err:
log.error('%% Message failed delivery: %s\n' % err)
else:
log.info('%% Message delivered to %s [%d] @ %d\n' % (msg.topic(), msg.partition(), msg.offset()))
def produce(self, topic: str, key: UserLoginKey, value: UserLogin) -> None:
"""
Отправляет сообщение в Kafka
Args:
topic: топик в который будет написано сообщение
key: ключ сообщения
value: значение сообщение
Returns:
Ничего
"""
if topic not in self._producer.list_topics().topics:
log.warning(f"Message {key=}, {value=} skipped due to {topic=} don't exists")
return
try:
self._producer.produce(
topic, key=key.model_dump_json(), value=value.model_dump_json(), callback=self.delivery_callback
)
except KafkaException:
log.critical("Kafka is down")
self._producer.poll(0)
def close(self) -> None:
self._producer.flush()
class KafkaMock(KafkaMeta):
def produce(self, topic: str, key: Any, value: Any) -> Any:
log.debug(f"Kafka cluster disabled, debug msg: {topic=}, {key=}, {value=}")
def close(self) -> None:
return
@lru_cache
def get_kafka_producer() -> KafkaMeta:
"""
Возвращает реальный клиент кафки, если задан ``KAFKA_DSN``,
иначе Mock кафки
"""
if get_settings().KAFKA_DSN:
return Kafka()
return KafkaMock()