-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathairflow.py
65 lines (55 loc) · 2.67 KB
/
airflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import logging
import aiohttp
from pydantic import AnyUrl
from auth_backend.auth_method.outer import ConnectionIssue, OuterAuthMeta
from auth_backend.settings import Settings
logger = logging.getLogger(__name__)
class AirflowOuterAuthSettings(Settings):
AIRFLOW_AUTH_BASE_URL: AnyUrl | None = None
AIRFLOW_AUTH_ADMIN_USERNAME: str | None = None
AIRFLOW_AUTH_ADMIN_PASSWORD: str | None = None
class AirflowOuterAuth(OuterAuthMeta):
prefix = '/airflow'
settings = AirflowOuterAuthSettings()
@classmethod
async def _is_outer_user_exists(cls, username: str) -> bool:
"""Проверяет наличие пользователя в Airflow"""
logger.debug("_is_outer_user_exists class=%s started", cls.get_name())
async with aiohttp.ClientSession() as session:
async with session.get(
str(cls.settings.AIRFLOW_AUTH_BASE_URL).removesuffix('/') + '/auth/fab/v1/users/' + username,
auth=aiohttp.BasicAuth(
cls.settings.AIRFLOW_AUTH_ADMIN_USERNAME, cls.settings.AIRFLOW_AUTH_ADMIN_PASSWORD
),
) as response:
if not response.ok:
raise ConnectionIssue(response.text)
res: dict[str] = await response.json()
return res.get('username') == username
@classmethod
async def _update_outer_user_password(cls, username: str, password: str):
"""Устанавливает пользователю новый пароль в Airflow"""
logger.debug("_update_outer_user_password class=%s started", cls.get_name())
res = False
async with aiohttp.ClientSession() as session:
async with session.patch(
str(cls.settings.AIRFLOW_AUTH_BASE_URL).removesuffix('/') + '/auth/fab/v1/users/' + username,
auth=aiohttp.BasicAuth(
cls.settings.AIRFLOW_AUTH_ADMIN_USERNAME, cls.settings.AIRFLOW_AUTH_ADMIN_PASSWORD
),
params={"update_mask": ["password"]},
json={
"password": password,
"email": "no_change",
"first_name": "no_change",
"last_name": "no_change",
"roles": [],
"username": "no_change",
},
) as response:
res = response.ok
logger.debug("_update_outer_user_password class=%s response %s", cls.get_name(), str(response.status))
if res:
logger.info("User %s updated in Airflow", username)
else:
logger.error("User %s can't be updated in Airflow. Error: %s", username, res)