Skip to content

Commit

Permalink
fix: bugs, refactor changes, remove unneccessary dependency to flask_…
Browse files Browse the repository at this point in the history
…login
  • Loading branch information
hiddify-com committed Jan 2, 2024
1 parent 83c41d4 commit a2c0022
Show file tree
Hide file tree
Showing 5 changed files with 396 additions and 153 deletions.
5 changes: 3 additions & 2 deletions hiddifypanel/panel/admin/UserAdmin.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,10 @@ def _name_formatter(view, context, model, name):
else:
link = '<i class="fa-solid fa-circle-xmark text-danger"></i> '

d = get_panel_domains()[0]
d = request.host
client_proxy_path = hconfig(ConfigEnum.proxy_path_client)
link += f"<a target='_blank' href='https://{model.username}:{model.password}@{d}/{client_proxy_path}/#{model.name}'>{model.name} <i class='fa-solid fa-arrow-up-right-from-square'></i></a>"
link += f"<a target='_blank' class='copy-link' href='https://{d}/{client_proxy_path}/{model.uuid}/#{model.name}'>{model.name} <i class='fa-solid fa-arrow-up-right-from-square'></i></a>"
# link += f"<a target='_blank' class='copy-link' href='https://{model.username}:{model.password}@{d}/{client_proxy_path}/#{model.name}'>{model.name} <i class='fa-solid fa-arrow-up-right-from-square'></i></a>"
return Markup(extra+link)

def _ul_formatter(view, context, model, name):
Expand Down
231 changes: 115 additions & 116 deletions hiddifypanel/panel/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,118 +9,141 @@
import hiddifypanel.panel.hiddify as hiddify
from hiddifypanel import hutils

from werkzeug.local import LocalProxy
current_account = LocalProxy(lambda: _get_user())

class CustumLoginManager(LoginManager):
def _load_user(self):
if self._user_callback is None and self._request_callback is None:
raise Exception(
"Missing user_loader or request_loader. Refer to "
"http://flask-login.readthedocs.io/#how-it-works "
"for more info."
)

user_accessed.send(current_app._get_current_object()) # type: ignore
def _get_user():
if not hasattr(g, "account"):
g.account = None
return g.account

# Check SESSION_PROTECTION
if self._session_protection_failed():
return self._update_request_context_with_user()

user = None

account_id = ''
# Load user from Flask Session
if hiddify.is_api_call(request.path):
if hiddify.is_user_api_call():
account_id = session.get("_user_id")
else:
account_id = session.get("_admin_id")
elif hiddify.is_user_panel_call():
account_id = session.get("_user_id")
else:
account_id = session.get("_admin_id")

if account_id is not None and self._user_callback is not None:
user = self._user_callback(account_id)

# Load user from Remember Me Cookie or Request Loader
if user is None:
config = current_app.config
cookie_name = config.get("REMEMBER_COOKIE_NAME", COOKIE_NAME)
header_name = config.get("AUTH_HEADER_NAME", AUTH_HEADER_NAME)
has_cookie = (
cookie_name in request.cookies and session.get("_remember") != "clear"
)
if header_name in request.headers:
header = request.headers[header_name]
user = self._load_user_from_header(header)
elif self._request_callback:
user = self._load_user_from_request(request)
elif has_cookie:
cookie = request.cookies[cookie_name]
user = self._load_user_from_remember_cookie(cookie)

return self._update_request_context_with_user(user)
def logout_user():
g.account = None
if '_user_id' in session:
session.pop('_user_id')
if '_admin_id' in session:
session.pop('_admin_id')


def login_user(user: AdminUser | User, remember=False, duration=None, force=False, fresh=True):
if not force and not user.is_active:
g.account = user
if not user.is_active:
return False

account_id = getattr(user, current_app.login_manager.id_attribute)() # type: ignore
account_id = user.get_id() # type: ignore
if user.role in {Role.super_admin, Role.admin, Role.agent}:
session["_admin_id"] = account_id
else:
session["_user_id"] = account_id
session["_fresh"] = fresh
session["_id"] = current_app.login_manager._session_identifier_generator() # type: ignore

if remember:
session["_remember"] = "set"
if duration is not None:
try:
# equal to timedelta.total_seconds() but works with Python 2.6
session["_remember_seconds"] = (
duration.microseconds
+ (duration.seconds + duration.days * 24 * 3600) * 10**6
) / 10.0**6
except AttributeError as e:
raise Exception(
f"duration must be a datetime.timedelta, instead got: {duration}"
) from e

current_app.login_manager._update_request_context_with_user(user) # type: ignore
user_logged_in.send(current_app._get_current_object(), user=_get_user()) # type: ignore
# session["_fresh"] = fresh
# session["_id"] = current_app.login_manager._session_identifier_generator() # type: ignore

# if remember:
# session["_remember"] = "set"
# if duration is not None:
# try:
# # equal to timedelta.total_seconds() but works with Python 2.6
# session["_remember_seconds"] = (
# duration.microseconds
# + (duration.seconds + duration.days * 24 * 3600) * 10**6
# ) / 10.0**6
# except AttributeError as e:
# raise Exception(
# f"duration must be a datetime.timedelta, instead got: {duration}"
# ) from e

# current_app.login_manager._update_request_context_with_user(user) # type: ignore
# user_logged_in.send(current_app._get_current_object(), user=_get_user()) # type: ignore
return True


def login_required(roles: set[Role] | None = None):
def wrapper(fn):
@wraps(fn)
def decorated_view(*args, **kwargs):
if not current_user.is_authenticated:
return current_app.login_manager.unauthorized() # type: ignore
if not current_account:
return redirect_to_login() # type: ignore
if roles:
account_role = current_user.role
account_role = current_account.role
if account_role not in roles:
return current_app.login_manager.unauthorized() # type: ignore
return redirect_to_login() # type: ignore
return fn(*args, **kwargs)
return decorated_view
return wrapper


def get_account_by_api_key(api_key, is_admin):
return AdminUser.by_uuid(api_key) if is_admin else User.by_uuid(api_key)


def get_account_by_uuid(uuid, is_admin):
return AdminUser.by_uuid(uuid) if is_admin else User.by_uuid(uuid)


def init_app(app):
# login_manager = LoginManager()
login_manager = CustumLoginManager()
login_manager.init_app(app)
# login_manager = LoginManager.()
# login_manager = CustumLoginManager()
# login_manager.init_app(app)

@app.before_request
def auth():
account = None

is_admin_path = hiddify.is_admin_proxy_path()
next_url = None

@login_manager.user_loader
if auth_header := request.headers.get("Authorization"):
apikey = hutils.utils.get_apikey_from_auth_header(auth_header)
account = get_account_by_api_key(apikey, is_admin_path)
if not account:
logout_user()

if not account and (uuid := hutils.utils.get_uuid_from_url_path(request.path)):
account = get_account_by_uuid(uuid, is_admin_path)
print("---------", account)
if not account:
logout_user()
else:
next_url = request.url.replace(f'/{uuid}/', '/' if is_admin_path else '/client/')

if not account and request.authorization:
uname = request.authorization.username
pword = request.authorization.password
account = AdminUser.by_username_password(uname, pword) if is_admin_path else User.by_username_password(uname, pword)
print(request.authorization, account)
if not account:
logout_user()
if not account and (session_user := session.get('_user_id')):
account = User.by_id(int(session_user.split("_")[1])) # type: ignore
if not account:
logout_user()
if not account and (session_admin := session.get('_admin_id')):
account = AdminUser.by_id(int(session_admin.split("_")[1])) # type: ignore

if account:
g.account = account
# g.account_uuid = account.uuid
g.is_admin = hiddify.is_admin_role(account.role) # type: ignore
login_user(account, force=True)
print("loggining in")
if next_url is not None:
if 0 and g.user_agent.browser:
return redirect(next_url)
else:
request.url = next_url
return app.dispatch_request()

# @login_manager.user_loader
def cookie_auth(id: str) -> User | AdminUser | None:
if not hiddify.is_api_call(request.path):
# if request.headers.get("Authorization"):
# return header_auth(request)
# for handle new login
if hiddify.is_login_call():
return header_auth(request)
# if not hiddify.is_api_call(request.path):
# # if request.headers.get("Authorization"):
# # return header_auth(request)
# # for handle new login
# if hiddify.is_login_call():
# # print("DDDDDDDDDDDDDDD-")
# return header_auth(request)

# parse id
acc_type, id = hutils.utils.parse_login_id(id) # type: ignore
Expand All @@ -138,41 +161,17 @@ def cookie_auth(id: str) -> User | AdminUser | None:
g.is_admin = hiddify.is_admin_role(account.role)
return account

@login_manager.request_loader
def header_auth(request) -> User | AdminUser | None:
auth_header: str = request.headers.get("Authorization")
if not auth_header:
return

account = None
is_api_call = False

if hiddify.is_api_call(request.path):
if apikey := hutils.utils.get_apikey_from_auth_header(auth_header):
account = User.by_uuid(apikey) or AdminUser.by_uuid(apikey)
is_api_call = True
else:
uname = request.authorization.username
pword = request.authorization.password
account = AdminUser.by_username_password(uname, pword) if hiddify.is_admin_proxy_path() else User.by_username_password(uname, pword)

if account:
g.account = account
# g.account_uuid = account.uuid
g.is_admin = hiddify.is_admin_role(account.role) # type: ignore
login_user(account)
else:
logout_user()
# @login_manager.unauthorized_handler

return account

@login_manager.unauthorized_handler
def unauthorized():
# TODO: show the login page
# return request.base_url
if g.user_agent.browser:
return redirect(url_for('common_bp.LoginView:basic_0', force=1, next={request.path}))
def redirect_to_login():
# TODO: show the login page
# return request.base_url
if g.user_agent.browser:
return redirect(url_for('common_bp.LoginView:basic_0', force=1, next=request.path))

else:
abort(401, "Unauthorized")
else:
abort(401, "Unauthorized")
# return f'/{request.path.split("/")[1]}/?force=1&redirect={request.path}'

# @login_manager.request_loader
Loading

0 comments on commit a2c0022

Please sign in to comment.