-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsrv_api.py
132 lines (116 loc) · 5.44 KB
/
srv_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#!/usr/bin/python3
#####=====----- TEMPORAL for WinDev -----=====#####
import sys
sys.path.append('VENV\\Lib\\site-packages')
###################################################
import json
import uuid
from time import time
import sqlalchemy as sa
from sqlalchemy.orm import declarative_base, Session
import jwt
''' =====----- Global variables -----====='''
# Настройки для SQLAlchemy
DB_PATH = 'sqlite:///sqlite/db.sqlite3'
Base = declarative_base()
ENGINE = sa.create_engine(DB_PATH)
# Время жизни access-token
ACC_TTL = 600.0
# Время жизни refresh-token
REF_TTL = 3600.0
''' =====----- Classes -----===== '''
class User(Base):
__tablename__ = 'Users'
uid = sa.Column(sa.String(36), primary_key=True)
admin = sa.Column(sa.Boolean)
name = sa.Column(sa.String(1024))
login = sa.Column(sa.String(1024))
password = sa.Column(sa.String(1024))
acc_token = sa.Column(sa.String(36))
acc_expired = sa.Column(sa.Float)
ref_token = sa.Column(sa.String(36))
ref_expired = sa.Column(sa.Float)
comment = sa.Column(sa.Text(1024))
''' =====----- Decorators -----===== '''
def auth_decor(fn_to_be_decor):
''' Декоратор для функций, которые в именованном аргументе 'req_data'
получают данные в виде JSON Web Token.
Распаковывает JWT, проверяет по базе наличие действительного
access-tokena, по результату передаёт декорируемой функции
именованный аргумент 'auth_ok' [bool] и полезную нагрузку в
именованном аргументе 'payload'.
'''
def fn_wrapper(**kwargs):
ok_ = False
payload_ = dict()
if 'req_data' in kwargs.keys():
decoded_jwt_ = jwt.api_jwt.decode_complete(kwargs['req_data'],
key=JWT_KEY,
algorithms='HS256')
##### header_ = decoded_jwt_['header']
##### token_ = header_['acc_token']
token_ = decoded_jwt_['header']['acc_token']
payload_ = decoded_jwt_['payload']
try:
with Session(ENGINE) as s_:
user_ = s_.query(User).filter(User.acc_token == token_).first()
try:
if user_.expired > time():
# Время действия токена не закончилось
ok_ = True
except:
# Токен закончился или его вообще нет
ok_ = False
except:
# Что-то не так с БД
ok_ = False
# Декорируемая функция
result_ = fn_to_be_decor(auth_ok=ok_, payload=payload_, **kwargs)
return result_
return fn_wrapper
''' =====----- API Methods -----===== '''
def login_getpost(credentials: dict) -> dict:
''' Метод для аутентификации на сервере. При логине пользователя
записывает ему в таблицу "Users" выданные access-token и refresh-token
и время окончания их действия "acc_expired" и "ref_expired".
Arguments:
credentials [dict] -- Словарь/json с ключами "login", "password"
Returns:
[dict] -- Словарь/json с ключами "status", "text", "acc_token",
"acc_expired", "ref_token", "ref_expired"
или с ключами "status", "text" в случае ошибки
'''
output_dict_ = {'status': 'fail',
'text': 'Unknown request'
}
try:
with Session(ENGINE) as s_:
login_ = credentials['login']
password_ = credentials['password']
user_ = s_.query(User).filter(User.login == login_).first()
if user_:
if user_.password == password_:
acc_token_ = str(uuid.uuid4())
ref_token_ = str(uuid.uuid4())
# Обновление пользователя в базе
user_.acc_token = acc_token_
user_.ref_token = ref_token_
user_.acc_expired = time() + ACC_TTL
user_.ref_expired = time() + REF_TTL
s_.add(user_)
s_.commit()
# Формирование ответа
output_dict_['status'] = 'success'
output_dict_['text'] = f'User {login_}: logged in'
output_dict_['acc_token'] = acc_token_
output_dict_['acc_expired'] = user_.acc_expired
output_dict_['ref_token'] = ref_token_
output_dict_['ref_expired'] = user_.ref_expired
else:
output_dict_['text'] = f'User {login_}: login failed'
else:
output_dict_['text'] = f'User {login_}: not exists'
except Exception as e_:
print(e_)
return json.dumps(output_dict_, ensure_ascii=False, indent=2)
#####=====----- THE END -----=====#########################################