-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathgoogle.py
161 lines (137 loc) · 6.55 KB
/
google.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import logging
import google_auth_oauthlib.flow
from google.oauth2.id_token import verify_oauth2_token
from google.auth.exceptions import GoogleAuthError
from fastapi import Depends
from fastapi_sqlalchemy import db
import oauthlib.oauth2.rfc6749.errors
from pydantic import BaseModel, Field, Json
from google.auth.transport import requests
from auth_backend.models.db import AuthMethod, User, UserSession
from auth_backend.exceptions import AlreadyExists, OauthAuthFailed, OauthCredentialsIncorrect
from auth_backend.settings import Settings
from auth_backend.utils.security import UnionAuth
from .auth_method import OauthMeta, Session
logger = logging.getLogger(__name__)
auth = UnionAuth(auto_error=False)
class GoogleSettings(Settings):
GOOGLE_REDIRECT_URL: str = 'https://app.test.profcomff.com/auth/oauth-authorized/google'
GOOGLE_SCOPES: list[str] = ['openid', 'https://www.googleapis.com/auth/userinfo.profile']
GOOGLE_CREDENTIALS: Json = '{}'
class GoogleAuth(OauthMeta):
"""Вход в приложение по аккаунту гугл"""
prefix = '/google'
tags = ['Google']
fields = ["code", "scope"]
settings = GoogleSettings()
class OauthResponseSchema(BaseModel):
code: str | None
scope: str | None
state: str | None
id_token: str | None = Field(help="Google JWT token identifier")
def __init__(self):
super().__init__()
@classmethod
async def _register(
cls,
user_inp: OauthResponseSchema,
user_session: UserSession | None = Depends(auth),
) -> Session:
"""Создает аккаунт или привязывает существующий
Если передана активная сессия пользователя, то привязывает аккаунт Google к аккаунту в
активной сессии. Иначе, создает новый пользователь и делает Google первым методом входа.
"""
if not user_inp.id_token:
flow = await cls._default_flow()
try:
credentials = flow.fetch_token(**user_inp.dict(exclude_unset=True))
except oauthlib.oauth2.rfc6749.errors.InvalidGrantError as exc:
raise OauthCredentialsIncorrect(f'Google account response invalid: {exc}')
id_token = credentials.get("id_token")
else:
id_token = user_inp.id_token
try:
guser_id = verify_oauth2_token(
id_token,
requests.Request(),
cls.settings.GOOGLE_CREDENTIALS['web']['client_id'],
)
except GoogleAuthError as exc:
raise OauthCredentialsIncorrect(f'Google account response invalid: {exc}')
user = await cls._get_user(guser_id, db_session=db.session)
if user is not None:
raise AlreadyExists(user, user.id)
if user_session is None:
user = await cls._create_user(db_session=db.session) if user_session is None else user_session.user
else:
user = user_session.user
await cls._register_auth_method(guser_id, user, db_session=db.session)
return await cls._create_session(user, db_session=db.session)
@classmethod
async def _login(cls, user_inp: OauthResponseSchema):
"""Вход в пользователя с помощью аккаунта Google
Производит вход, если находит пользователя по Google client_id. Если аккаунт не найден,
возвращает ошибка.
"""
flow = await cls._default_flow()
try:
credentials = flow.fetch_token(**user_inp.dict(exclude_unset=True))
except oauthlib.oauth2.rfc6749.errors.OAuth2Error as exc:
raise OauthCredentialsIncorrect(f'Google account response invalid: {exc}')
try:
guser_id = verify_oauth2_token(
credentials.get("id_token"),
requests.Request(),
cls.settings.GOOGLE_CREDENTIALS['web']['client_id'],
)
except GoogleAuthError as exc:
raise OauthCredentialsIncorrect(f'Google account response invalid: {exc}')
user = await cls._get_user(guser_id, db_session=db.session)
if not user:
raise OauthAuthFailed('No users found for google account', id_token=credentials.get("id_token"))
return await cls._create_session(user, db_session=db.session)
@classmethod
async def _redirect_url(cls):
"""URL на который происходит редирект после завершения входа на стороне провайдера"""
return OauthMeta.UrlSchema(url=cls.settings.GOOGLE_REDIRECT_URL)
@classmethod
async def _auth_url(cls):
"""URL на который происходит редирект из приложения для авторизации на стороне провайдера"""
# Docs: https://developers.google.com/identity/protocols/oauth2/web-server#python_1
flow = await cls._default_flow()
authorization_url, _ = flow.authorization_url(
access_type='offline',
include_granted_scopes='true',
)
return OauthMeta.UrlSchema(url=authorization_url)
@classmethod
async def _default_flow(cls) -> google_auth_oauthlib.flow.Flow:
flow = google_auth_oauthlib.flow.Flow.from_client_config(
cls.settings.GOOGLE_CREDENTIALS,
scopes=cls.settings.GOOGLE_SCOPES,
)
flow.redirect_uri = cls.settings.GOOGLE_REDIRECT_URL
return flow
@classmethod
async def _register_auth_method(cls, guser_id, user: User, *, db_session):
"""Добавление пользователю новый AuthMethod"""
AuthMethod.create(
user_id=user.id,
auth_method=cls.get_name(),
param='unique_google_id',
value=guser_id['sub'],
session=db_session,
)
@classmethod
async def _get_user(cls, guser_id: dict[str], *, db_session: Session) -> User | None:
auth_method: AuthMethod = (
AuthMethod.query(session=db_session)
.filter(
AuthMethod.param == "unique_google_id",
AuthMethod.value == guser_id['sub'], # An identifier for the user, unique among all Google accounts
AuthMethod.auth_method == cls.get_name(),
)
.one_or_none()
)
if auth_method:
return auth_method.user